diff options
| -rw-r--r-- | docs/api.md | 70 | ||||
| -rw-r--r-- | hi-ui/src/apiServer.js | 11 | ||||
| -rw-r--r-- | hi-ui/src/lib/ChannelList.svelte | 20 | ||||
| -rw-r--r-- | hi-ui/src/lib/CreateChannelForm.svelte | 3 | ||||
| -rw-r--r-- | hi-ui/src/routes/+page.svelte | 25 | ||||
| -rw-r--r-- | hi-ui/src/store/messages.js | 6 | ||||
| -rw-r--r-- | src/channel/routes.rs | 89 | ||||
| -rw-r--r-- | src/channel/routes/test/list.rs | 65 | ||||
| -rw-r--r-- | src/channel/routes/test/mod.rs | 1 | ||||
| -rw-r--r-- | src/login/routes.rs | 65 | ||||
| -rw-r--r-- | src/message/app.rs | 27 |
11 files changed, 146 insertions, 236 deletions
diff --git a/docs/api.md b/docs/api.md index e8c8c8c..91485f3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -25,6 +25,25 @@ Returns information needed to boot the client. Also the recommended way to check "id": "L1234abcd", }, "resume_point": "1312", + "channels": [ + { + "name": "nonsense and such", + "id": "C1234abcd", + "messages": [ + { + "at": "2024-09-27T23:19:10.208147Z", + "sender": { + "id": "L1234abcd", + "name": "example username" + }, + "message": { + "id": "M1312acab", + "body": "beep" + } + } + ] + } + ] } ``` @@ -77,27 +96,6 @@ This endpoint returns a 204 No Content response on success, with a `Set-Cookie` Channels are the containers for conversations. The API supports listing channels, creating new channels, and send messages to an existing channel. -### `GET /api/channels` - -Lists channels. - -#### Query parameters - -This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to channels as they existed at a fixed point in time, with any later changes only appearing in the event stream. - -#### On success - -Responds with a list of channel objects, one per channel: - -```json -[ - { - "name": "nonsense and such", - "id": "C1234abcd", - } -] -``` - ### `POST /api/channels` Creates a channel. @@ -127,36 +125,6 @@ Channel names must be unique. If a channel with the same name already exists, th The API delivers events to clients to update them on other clients' actions and messages. While there is no specific delivery deadline, messages are delivered as soon as possible on a best-effort basis, and the event system allows clients to replay events or resume interrupted streams, to allow recovery if a message is lost. -### `GET /api/channels/:channel/messages` - -Retrieves historical messages in a channel. - -The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. - -#### Query parameters - -This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to messages as they existed at a fixed point in time, with any later changes only appearing in the event stream. - -#### On success - -Responds with a list of message objects, one per message: - -```json -[ - { - "at": "2024-09-27T23:19:10.208147Z", - "sender": { - "id": "L1234abcd", - "name": "example username" - }, - "message": { - "id": "M1312acab", - "body": "beep" - } - } -] -``` - ### `POST /api/channels/:channel` Sends a chat message to a channel. It will be relayed to clients subscribed to the channel's events, and recorded for replay. diff --git a/hi-ui/src/apiServer.js b/hi-ui/src/apiServer.js index 6273576..f4a89a4 100644 --- a/hi-ui/src/apiServer.js +++ b/hi-ui/src/apiServer.js @@ -21,10 +21,6 @@ export async function logOut() { return apiServer.post('/auth/logout', {}); } -export async function listChannels() { - return apiServer.get('/channels'); -} - export async function createChannel(name) { return apiServer.post('/channels', { name }); } @@ -37,8 +33,10 @@ export async function deleteMessage(messageId) { // TODO } -export function subscribeToEvents() { - const evtSource = new EventSource("/api/events"); +export function subscribeToEvents(resume_point) { + const eventsUrl = new URL('/api/events', window.location); + eventsUrl.searchParams.append('resume_point', resume_point); + const evtSource = new EventSource(eventsUrl.toString()); // TODO: this should process all incoming events and store them. // TODO: eventually we'll need to handle expiring old info, so as not to use // infinite browser memory. @@ -58,6 +56,7 @@ export function subscribeToEvents() { switch (data.type) { case 'created': + channelsList.update((value) => value.addChannel(data.channel)) break; case 'message': messages.update((value) => value.addMessage(data)); diff --git a/hi-ui/src/lib/ChannelList.svelte b/hi-ui/src/lib/ChannelList.svelte index 5577d94..ba48e5d 100644 --- a/hi-ui/src/lib/ChannelList.svelte +++ b/hi-ui/src/lib/ChannelList.svelte @@ -1,32 +1,18 @@ <script> - import { onMount } from 'svelte'; - - import { listChannels } from '../apiServer'; import { channelsList } from '../store'; import Channel from './Channel.svelte'; let channels; - let loading = true; channelsList.subscribe((value) => { channels = value.channels; }); - - onMount(async () => { - let channels = await listChannels(); - channelsList.update((value) => value.setChannels(channels.data)); - loading = false; - }); </script> <ul class="select-none"> - {#if loading} - <li><em>loading channels…</em></li> - {:else} - {#each channels as channel} - <Channel {...channel} /> - {/each} - {/if} + {#each channels as channel} + <Channel {...channel} /> + {/each} </ul> <style> diff --git a/hi-ui/src/lib/CreateChannelForm.svelte b/hi-ui/src/lib/CreateChannelForm.svelte index aa415fd..c08430b 100644 --- a/hi-ui/src/lib/CreateChannelForm.svelte +++ b/hi-ui/src/lib/CreateChannelForm.svelte @@ -1,8 +1,6 @@ <script> import { createChannel } from '../apiServer'; - import { channelsList } from '../store'; - let name = ''; let disabled = false; @@ -10,7 +8,6 @@ disabled = true; const response = await createChannel(name); if (200 <= response.status && response.status < 300) { - channelsList.update((value) => value.addChannel(response.data)); name = ''; } disabled = false; diff --git a/hi-ui/src/routes/+page.svelte b/hi-ui/src/routes/+page.svelte index 66b4f8d..1a61b3e 100644 --- a/hi-ui/src/routes/+page.svelte +++ b/hi-ui/src/routes/+page.svelte @@ -2,7 +2,7 @@ import { onMount } from 'svelte'; import { boot, subscribeToEvents } from '../apiServer'; - import { currentUser } from '../store'; + import { currentUser, channelsList, messages } from '../store'; import ActiveChannel from '../lib/ActiveChannel.svelte'; import ChannelList from '../lib/ChannelList.svelte'; @@ -18,16 +18,29 @@ user = value; }); + function onBooted(boot) { + currentUser.update(() => ({ + id: boot.login.id, + username: boot.login.name, + })); + let channels = boot.channels.map((channel) => ({ + id: channel.id, + name: channel.name, + })); + channelsList.update((value) => value.setChannels(channels)); + let bootMessages = boot.channels.map((channel) => [channel.id, channel.messages]); + for (let [channel, channelMessages] of bootMessages) { + messages.update((value) => value.addMessages(channel, channelMessages)); + } + } + onMount(async () => { try { let response = await boot(); switch (response.status) { case 200: - currentUser.update(() => ({ - username: response.data.login.name, - id: response.data.login.id, - })); - subscribeToEvents(); + onBooted(response.data); + subscribeToEvents(response.data.resume_point); break; case 401: currentUser.update(() => null); diff --git a/hi-ui/src/store/messages.js b/hi-ui/src/store/messages.js index d1f19d3..560b9e1 100644 --- a/hi-ui/src/store/messages.js +++ b/hi-ui/src/store/messages.js @@ -17,6 +17,12 @@ export class Messages { return this; } + addMessages(channel, payloads) { + this.updateChannel(channel, (messages) => [...messages, ...payloads]); + return this; + } + + deleteMessage(channel, message) { let messages = this.messages(channel).filter((msg) => msg.message.id != message); this.channels[channel] = messages; diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 23c0602..5d67af8 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,56 +2,21 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{delete, get, post}, + routing::{delete, post}, Router, }; -use axum_extra::extract::Query; use super::{app, Channel, Id}; -use crate::{ - app::App, - clock::RequestedAt, - error::Internal, - event::{Instant, Sequence}, - login::Login, - message::{self, app::SendError}, -}; +use crate::{app::App, clock::RequestedAt, error::Internal, login::Login, message::app::SendError}; #[cfg(test)] mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) - .route("/api/channels/:channel/messages", get(messages)) -} - -#[derive(Default, serde::Deserialize)] -struct ResumeQuery { - resume_point: Option<Sequence>, -} - -async fn list( - State(app): State<App>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Channels, Internal> { - let channels = app.channels().all(query.resume_point).await?; - let response = Channels(channels); - - Ok(response) -} - -struct Channels(Vec<Channel>); - -impl IntoResponse for Channels { - fn into_response(self) -> Response { - let Self(channels) = self; - Json(channels).into_response() - } } #[derive(Clone, serde::Deserialize)] @@ -150,53 +115,3 @@ impl IntoResponse for ErrorResponse { } } } - -async fn messages( - State(app): State<App>, - Path(channel): Path<Id>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Messages, ErrorResponse> { - let messages = app - .channels() - .messages(&channel, query.resume_point) - .await?; - let response = Messages( - messages - .into_iter() - .map(|message| MessageView { - sent: message.sent, - sender: message.sender, - message: MessageInner { - id: message.id, - body: message.body, - }, - }) - .collect(), - ); - - Ok(response) -} - -struct Messages(Vec<MessageView>); - -#[derive(serde::Serialize)] -struct MessageView { - #[serde(flatten)] - sent: Instant, - sender: Login, - message: MessageInner, -} - -#[derive(serde::Serialize)] -struct MessageInner { - id: message::Id, - body: String, -} - -impl IntoResponse for Messages { - fn into_response(self) -> Response { - let Self(messages) = self; - Json(messages).into_response() - } -} diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs deleted file mode 100644 index f15a53c..0000000 --- a/src/channel/routes/test/list.rs +++ /dev/null @@ -1,65 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; - -use crate::{channel::routes, test::fixtures}; - -#[tokio::test] -async fn empty_list() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.is_empty()); -} - -#[tokio::test] -async fn one_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.contains(&channel)); -} - -#[tokio::test] -async fn multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channels = vec![ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - // Call the endpoint - - let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels - .into_iter() - .all(|channel| response_channels.contains(&channel))); -} diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs index ab663eb..3e5aa17 100644 --- a/src/channel/routes/test/mod.rs +++ b/src/channel/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod list; mod on_create; mod on_send; diff --git a/src/login/routes.rs b/src/login/routes.rs index 0874cc3..b0e3fee 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -5,12 +5,16 @@ use axum::{ routing::{get, post}, Router, }; +use futures::stream::{self, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, + channel::Channel, clock::RequestedAt, error::{Internal, Unauthorized}, + event::Instant, login::{Login, Password}, + message::{self, Message}, token::{app, extract::IdentityToken}, }; @@ -26,9 +30,21 @@ pub fn router() -> Router<App> { async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> { let resume_point = app.logins().boot_point().await?; + let channels = app.channels().all(resume_point.into()).await?; + let channels = stream::iter(channels) + .then(|channel| async { + app.messages() + .in_channel(&channel.id, resume_point.into()) + .await + .map(|messages| BootChannel::new(channel, messages)) + }) + .try_collect() + .await?; + Ok(Boot { login, resume_point: resume_point.to_string(), + channels, }) } @@ -36,6 +52,55 @@ async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> { struct Boot { login: Login, resume_point: String, + channels: Vec<BootChannel>, +} + +#[derive(serde::Serialize)] +struct BootChannel { + #[serde(flatten)] + channel: Channel, + messages: Vec<BootMessage>, +} + +impl BootChannel { + fn new(channel: Channel, messages: impl IntoIterator<Item = Message>) -> Self { + Self { + channel, + messages: messages.into_iter().map(BootMessage::from).collect(), + } + } +} + +#[derive(serde::Serialize)] +struct BootMessage { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: BootMessageBody, +} + +impl From<Message> for BootMessage { + fn from(message: Message) -> Self { + let Message { + sent, + channel: _, + sender, + id, + body, + } = message; + + Self { + sent, + sender, + message: BootMessageBody { id, body }, + } + } +} + +#[derive(serde::Serialize)] +struct BootMessageBody { + id: message::Id, + body: String, } impl IntoResponse for Boot { diff --git a/src/message/app.rs b/src/message/app.rs index 385c92e..1e50a65 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -44,6 +44,33 @@ impl<'a> Messages<'a> { Ok(message.as_sent()) } + pub async fn in_channel( + &self, + channel: &channel::Id, + resume_point: Option<Sequence>, + ) -> Result<Vec<Message>, DeleteError> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::ChannelNotFound(channel.clone()))?; + let messages = tx.messages().in_channel(&channel, resume_point).await?; + tx.commit().await?; + + let messages = messages + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let deleted = tx.sequence().next(deleted_at).await?; |
