diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-05 23:00:58 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-05 23:00:58 -0400 |
| commit | 05de3c7b211727039b3912311aa4bab6787a7457 (patch) | |
| tree | 08a3860b68391514390f42872ccc1cb4c6e6afd2 | |
| parent | bc514e0ea5f0a553f15ab8275961907877181520 (diff) | |
| parent | 6a10fcaf64938da52b326ea80013d9f30ed62a6c (diff) | |
Merge branch 'wip/boot'
35 files changed, 388 insertions, 392 deletions
diff --git a/docs/api.md b/docs/api.md index e8c8c8c..91485f3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -25,6 +25,25 @@ Returns information needed to boot the client. Also the recommended way to check "id": "L1234abcd", }, "resume_point": "1312", + "channels": [ + { + "name": "nonsense and such", + "id": "C1234abcd", + "messages": [ + { + "at": "2024-09-27T23:19:10.208147Z", + "sender": { + "id": "L1234abcd", + "name": "example username" + }, + "message": { + "id": "M1312acab", + "body": "beep" + } + } + ] + } + ] } ``` @@ -77,27 +96,6 @@ This endpoint returns a 204 No Content response on success, with a `Set-Cookie` Channels are the containers for conversations. The API supports listing channels, creating new channels, and send messages to an existing channel. -### `GET /api/channels` - -Lists channels. - -#### Query parameters - -This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to channels as they existed at a fixed point in time, with any later changes only appearing in the event stream. - -#### On success - -Responds with a list of channel objects, one per channel: - -```json -[ - { - "name": "nonsense and such", - "id": "C1234abcd", - } -] -``` - ### `POST /api/channels` Creates a channel. @@ -127,36 +125,6 @@ Channel names must be unique. If a channel with the same name already exists, th The API delivers events to clients to update them on other clients' actions and messages. While there is no specific delivery deadline, messages are delivered as soon as possible on a best-effort basis, and the event system allows clients to replay events or resume interrupted streams, to allow recovery if a message is lost. -### `GET /api/channels/:channel/messages` - -Retrieves historical messages in a channel. - -The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. - -#### Query parameters - -This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to messages as they existed at a fixed point in time, with any later changes only appearing in the event stream. - -#### On success - -Responds with a list of message objects, one per message: - -```json -[ - { - "at": "2024-09-27T23:19:10.208147Z", - "sender": { - "id": "L1234abcd", - "name": "example username" - }, - "message": { - "id": "M1312acab", - "body": "beep" - } - } -] -``` - ### `POST /api/channels/:channel` Sends a chat message to a channel. It will be relayed to clients subscribed to the channel's events, and recorded for replay. diff --git a/hi-ui/src/apiServer.js b/hi-ui/src/apiServer.js index 5e521de..f4a89a4 100644 --- a/hi-ui/src/apiServer.js +++ b/hi-ui/src/apiServer.js @@ -1,5 +1,5 @@ import axios from 'axios'; -import { activeChannel, channelsList, events } from './store'; +import { activeChannel, channelsList, messages } from './store'; export const apiServer = axios.create({ baseURL: '/api/', @@ -21,10 +21,6 @@ export async function logOut() { return apiServer.post('/auth/logout', {}); } -export async function listChannels() { - return apiServer.get('/channels'); -} - export async function createChannel(name) { return apiServer.post('/channels', { name }); } @@ -37,9 +33,10 @@ export async function deleteMessage(messageId) { // TODO } -export function subscribeToEvents() { - const evtSource = new EventSource("/api/events"); - events.update(() => []); +export function subscribeToEvents(resume_point) { + const eventsUrl = new URL('/api/events', window.location); + eventsUrl.searchParams.append('resume_point', resume_point); + const evtSource = new EventSource(eventsUrl.toString()); // TODO: this should process all incoming events and store them. // TODO: eventually we'll need to handle expiring old info, so as not to use // infinite browser memory. @@ -59,41 +56,18 @@ export function subscribeToEvents() { switch (data.type) { case 'created': + channelsList.update((value) => value.addChannel(data.channel)) break; case 'message': - events.update((value) => { - const eventList = [...value, data]; - eventList.sort((a, b) => a.at - b.at); - return eventList; - }); + messages.update((value) => value.addMessage(data)); break; case 'message_deleted': - events.update((value) => { - const eventList = value.map((el) => { - if (el.message?.id === data.message) { - el.message.body = '«…»'; - return el - } else { - return el; - } - }); - return eventList; - }); + messages.update((value) => value.deleteMessage(data.channel.id, data.message)); break; case 'deleted': - activeChannel.update((value) => { - if (value?.id === data.channel) { - return null; - } - return value; - }); - channelsList.update((value) => { - const channelIndex = value.map((e) => e.id).indexOf(data.channel); - if (channelIndex !== -1) { - value.splice(channelIndex, 1); - } - return value; - }); + activeChannel.update((value) => value.deleteChannel(data.channel)); + channelsList.update((value) => value.deleteChannel(data.channel)); + messages.update((value) => value.deleteChannel(data.channel)); break; default: break; diff --git a/hi-ui/src/lib/ActiveChannel.svelte b/hi-ui/src/lib/ActiveChannel.svelte index 84f9119..d2d92fb 100644 --- a/hi-ui/src/lib/ActiveChannel.svelte +++ b/hi-ui/src/lib/ActiveChannel.svelte @@ -1,15 +1,9 @@ <script> - import { activeChannel, events } from '../store'; + import { activeChannel, messages } from '../store'; import Message from './Message.svelte'; let container; - $: messages = $events.filter( - (ev) => ( - ev.type === 'message' - && $activeChannel !== null - && ev.channel.id === $activeChannel.id - ) - ); + $: messageList = $activeChannel.isSet() ? $messages.inChannel($activeChannel.get()) : []; // TODO: eventually, store scroll height/last unread in channel? scroll there? @@ -19,7 +13,7 @@ </script> <div class="container" bind:this={container}> - {#each messages as message} + {#each messageList as message} <div use:scroll> <Message {...message} /> </div> diff --git a/hi-ui/src/lib/Channel.svelte b/hi-ui/src/lib/Channel.svelte index 7826c46..ad07594 100644 --- a/hi-ui/src/lib/Channel.svelte +++ b/hi-ui/src/lib/Channel.svelte @@ -6,11 +6,11 @@ let active = false; activeChannel.subscribe((value) => { - active = value ? value.id == id : false; + active = value.is(id); }); function activate() { - activeChannel.update(() => ({ id, name })); + activeChannel.update((value) => value.set(id)); } </script> diff --git a/hi-ui/src/lib/ChannelList.svelte b/hi-ui/src/lib/ChannelList.svelte index 9f88e24..ba48e5d 100644 --- a/hi-ui/src/lib/ChannelList.svelte +++ b/hi-ui/src/lib/ChannelList.svelte @@ -1,32 +1,18 @@ <script> - import { onMount } from 'svelte'; - - import { listChannels } from '../apiServer'; import { channelsList } from '../store'; import Channel from './Channel.svelte'; let channels; - let loading = true; channelsList.subscribe((value) => { - channels = value; - }); - - onMount(async () => { - let channels = await listChannels(); - channelsList.update(() => channels.data); - loading = false; + channels = value.channels; }); </script> <ul class="select-none"> - {#if loading} - <li><em>loading channels…</em></li> - {:else} - {#each channels as channel} - <Channel {...channel} /> - {/each} - {/if} + {#each channels as channel} + <Channel {...channel} /> + {/each} </ul> <style> diff --git a/hi-ui/src/lib/CreateChannelForm.svelte b/hi-ui/src/lib/CreateChannelForm.svelte index 70dc13d..c08430b 100644 --- a/hi-ui/src/lib/CreateChannelForm.svelte +++ b/hi-ui/src/lib/CreateChannelForm.svelte @@ -1,8 +1,6 @@ <script> import { createChannel } from '../apiServer'; - import { channelsList } from '../store'; - let name = ''; let disabled = false; @@ -10,7 +8,6 @@ disabled = true; const response = await createChannel(name); if (200 <= response.status && response.status < 300) { - channelsList.update((value) => [...value, response.data]); name = ''; } disabled = false; diff --git a/hi-ui/src/lib/MessageInput.svelte b/hi-ui/src/lib/MessageInput.svelte index 9a8475c..b899221 100644 --- a/hi-ui/src/lib/MessageInput.svelte +++ b/hi-ui/src/lib/MessageInput.svelte @@ -8,12 +8,12 @@ let self; let input; - $: disabled = $activeChannel == null; + $: disabled = !$activeChannel.isSet(); async function handleSubmit(event) { disabled = true; // TODO try/catch: - await postToChannel($activeChannel?.id, input); + await postToChannel($activeChannel.get(), input); input = ''; disabled = false; await tick(); diff --git a/hi-ui/src/routes/+page.svelte b/hi-ui/src/routes/+page.svelte index 66b4f8d..1a61b3e 100644 --- a/hi-ui/src/routes/+page.svelte +++ b/hi-ui/src/routes/+page.svelte @@ -2,7 +2,7 @@ import { onMount } from 'svelte'; import { boot, subscribeToEvents } from '../apiServer'; - import { currentUser } from '../store'; + import { currentUser, channelsList, messages } from '../store'; import ActiveChannel from '../lib/ActiveChannel.svelte'; import ChannelList from '../lib/ChannelList.svelte'; @@ -18,16 +18,29 @@ user = value; }); + function onBooted(boot) { + currentUser.update(() => ({ + id: boot.login.id, + username: boot.login.name, + })); + let channels = boot.channels.map((channel) => ({ + id: channel.id, + name: channel.name, + })); + channelsList.update((value) => value.setChannels(channels)); + let bootMessages = boot.channels.map((channel) => [channel.id, channel.messages]); + for (let [channel, channelMessages] of bootMessages) { + messages.update((value) => value.addMessages(channel, channelMessages)); + } + } + onMount(async () => { try { let response = await boot(); switch (response.status) { case 200: - currentUser.update(() => ({ - username: response.data.login.name, - id: response.data.login.id, - })); - subscribeToEvents(); + onBooted(response.data); + subscribeToEvents(response.data.resume_point); break; case 401: currentUser.update(() => null); diff --git a/hi-ui/src/store.js b/hi-ui/src/store.js index a9d9421..4e6b4f1 100644 --- a/hi-ui/src/store.js +++ b/hi-ui/src/store.js @@ -1,6 +1,8 @@ import { writable } from 'svelte/store'; +import { ActiveChannel, Channels } from './store/channels'; +import { Messages } from './store/messages'; export const currentUser = writable(null); -export const activeChannel = writable(null); -export const channelsList = writable([]); -export const events = writable([]); +export const activeChannel = writable(new ActiveChannel()); +export const channelsList = writable(new Channels()); +export const messages = writable(new Messages()); diff --git a/hi-ui/src/store/channels.js b/hi-ui/src/store/channels.js new file mode 100644 index 0000000..20702cc --- /dev/null +++ b/hi-ui/src/store/channels.js @@ -0,0 +1,71 @@ +export class Channels { + constructor() { + this.channels = []; + } + + setChannels(channels) { + this.channels = [...channels]; + this.sort(); + return this; + } + + addChannel(channel) { + this.channels = [...this.channels, channel]; + this.sort(); + return this; + } + + deleteChannel(id) { + const channelIndex = this.channels.map((e) => e.id).indexOf(id); + if (channelIndex !== -1) { + this.channels.splice(channelIndex, 1); + } + return this; + } + + sort() { + this.channels.sort((a, b) => { + if (a.name < b.name) { + return -1; + } else if (a.name > b.name) { + return 1; + } + return 0; + }); + } +} + +export class ActiveChannel { + constructor() { + this.channel = null; + } + + isSet() { + return this.channel !== null; + } + + get() { + return this.channel; + } + + is(id) { + return this.channel === id; + } + + set(id) { + this.channel = id; + return this; + } + + deleteChannel(id) { + if (this.is(id)) { + return this.clear(); + } + return this; + } + + clear() { + this.channel = null; + return this; + } +} diff --git a/hi-ui/src/store/messages.js b/hi-ui/src/store/messages.js new file mode 100644 index 0000000..560b9e1 --- /dev/null +++ b/hi-ui/src/store/messages.js @@ -0,0 +1,41 @@ +export class Messages { + constructor() { + this.channels = {}; + } + + inChannel(channel) { + return this.channels[channel] || []; + } + + addMessage(message) { + let { + channel, + ...payload + } = message; + let channel_id = channel.id; + this.updateChannel(channel_id, (messages) => [...messages, payload]); + return this; + } + + addMessages(channel, payloads) { + this.updateChannel(channel, (messages) => [...messages, ...payloads]); + return this; + } + + + deleteMessage(channel, message) { + let messages = this.messages(channel).filter((msg) => msg.message.id != message); + this.channels[channel] = messages; + } + + deleteChannel(id) { + delete this.channels[id]; + return this; + } + + updateChannel(channel, callback) { + let messages = callback(this.inChannel(channel)); + messages.sort((a, b) => a.at - b.at); + this.channels[channel] = messages; + } +} @@ -1,13 +1,16 @@ use sqlx::sqlite::SqlitePool; use crate::{ + boot::app::Boot, channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, - login::app::Logins, message::app::Messages, token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; +#[cfg(test)] +use crate::login::app::Logins; + #[derive(Clone)] pub struct App { db: SqlitePool, @@ -24,6 +27,10 @@ impl App { } impl App { + pub const fn boot(&self) -> Boot { + Boot::new(&self.db) + } + pub const fn channels(&self) -> Channels { Channels::new(&self.db, &self.events) } @@ -32,6 +39,7 @@ impl App { Events::new(&self.db, &self.events) } + #[cfg(test)] pub const fn logins(&self) -> Logins { Logins::new(&self.db) } diff --git a/src/boot/app.rs b/src/boot/app.rs new file mode 100644 index 0000000..fc84b3a --- /dev/null +++ b/src/boot/app.rs @@ -0,0 +1,54 @@ +use sqlx::sqlite::SqlitePool; + +use super::{Channel, Snapshot}; +use crate::{ + channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _, +}; + +pub struct Boot<'a> { + db: &'a SqlitePool, +} + +impl<'a> Boot<'a> { + pub const fn new(db: &'a SqlitePool) -> Self { + Self { db } + } + + pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> { + let mut tx = self.db.begin().await?; + let resume_point = tx.sequence().current().await?; + let channels = tx.channels().all(resume_point.into()).await?; + + let channels = { + let mut snapshots = Vec::with_capacity(channels.len()); + + let channels = channels.into_iter().filter_map(|channel| { + channel + .as_of(resume_point) + .map(|snapshot| (channel, snapshot)) + }); + + for (channel, snapshot) in channels { + let messages = tx + .messages() + .in_channel(&channel, resume_point.into()) + .await?; + + let messages = messages + .into_iter() + .filter_map(|message| message.as_of(resume_point)); + + snapshots.push(Channel::new(snapshot, messages)); + } + + snapshots + }; + + tx.commit().await?; + + Ok(Snapshot { + resume_point, + channels, + }) + } +} diff --git a/src/boot/mod.rs b/src/boot/mod.rs new file mode 100644 index 0000000..bd0da0a --- /dev/null +++ b/src/boot/mod.rs @@ -0,0 +1,74 @@ +pub mod app; +mod routes; + +use crate::{ + channel, + event::{Instant, Sequence}, + login::Login, + message, +}; + +pub use self::routes::router; + +#[derive(serde::Serialize)] +pub struct Snapshot { + pub resume_point: Sequence, + pub channels: Vec<Channel>, +} + +#[derive(serde::Serialize)] +pub struct Channel { + pub id: channel::Id, + pub name: String, + pub messages: Vec<Message>, +} + +impl Channel { + fn new( + channel: channel::Channel, + messages: impl IntoIterator<Item = message::Message>, + ) -> Self { + // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types. + let channel::Channel { id, name } = channel; + + Self { + id, + name, + messages: messages.into_iter().map(Message::from).collect(), + } + } +} + +#[derive(serde::Serialize)] +pub struct Message { + #[serde(flatten)] + pub sent: Instant, + pub sender: Login, + // Named this way for serialization reasons + #[allow(clippy::struct_field_names)] + pub message: Body, +} + +impl From<message::Message> for Message { + fn from(message: message::Message) -> Self { + let message::Message { + sent, + channel: _, + sender, + id, + body, + } = message; + + Self { + sent, + sender, + message: Body { id, body }, + } + } +} + +#[derive(serde::Serialize)] +pub struct Body { + id: message::Id, + body: String, +} diff --git a/src/boot/routes.rs b/src/boot/routes.rs new file mode 100644 index 0000000..80f70bd --- /dev/null +++ b/src/boot/routes.rs @@ -0,0 +1,27 @@ +use axum::{ + extract::{Json, State}, + routing::get, + Router, +}; + +use super::Snapshot; +use crate::{app::App, error::Internal, login::Login}; + +#[cfg(test)] +mod test; + +pub fn router() -> Router<App> { + Router::new().route("/api/boot", get(boot)) +} + +async fn boot(State(app): State<App>, login: Login) -> Result<Json<Boot>, Internal> { + let snapshot = app.boot().snapshot().await?; + Ok(Boot { login, snapshot }.into()) +} + +#[derive(serde::Serialize)] +struct Boot { + login: Login, + #[serde(flatten)] + snapshot: Snapshot, +} diff --git a/src/login/routes/test/boot.rs b/src/boot/routes/test.rs index 9655354..5f2ba6f 100644 --- a/src/login/routes/test/boot.rs +++ b/src/boot/routes/test.rs @@ -1,12 +1,12 @@ -use axum::extract::State; +use axum::extract::{Json, State}; -use crate::{login::routes, test::fixtures}; +use crate::{boot::routes, test::fixtures}; #[tokio::test] async fn returns_identity() { let app = fixtures::scratch_app().await; let login = fixtures::login::fictitious(); - let response = routes::boot(State(app), login.clone()) + let Json(response) = routes::boot(State(app), login.clone()) .await .expect("boot always succeeds"); diff --git a/src/channel/app.rs b/src/channel/app.rs index 1b2cc48..a9a9e84 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, - message::{repo::Provider as _, Message}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -36,52 +36,6 @@ impl<'a> Channels<'a> { Ok(channel.as_created()) } - pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all(resume_point).await?; - tx.commit().await?; - - let channels = channels - .into_iter() - .filter_map(|channel| { - channel - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(channels) - } - - pub async fn messages( - &self, - channel: &Id, - resume_point: Option<Sequence>, - ) -> Result<Vec<Message>, Error> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| Error::NotFound(channel.clone()))?; - - let messages = tx - .messages() - .in_channel(&channel, resume_point) - .await? - .into_iter() - .filter_map(|message| { - message - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(messages) - } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index bd45d8d..383fb7b 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -25,6 +25,12 @@ impl History { pub fn as_created(&self) -> Channel { self.channel.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2b48436..2f57581 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, }; pub trait Provider { @@ -84,7 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 23c0602..5d67af8 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,56 +2,21 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{delete, get, post}, + routing::{delete, post}, Router, }; -use axum_extra::extract::Query; use super::{app, Channel, Id}; -use crate::{ - app::App, - clock::RequestedAt, - error::Internal, - event::{Instant, Sequence}, - login::Login, - message::{self, app::SendError}, -}; +use crate::{app::App, clock::RequestedAt, error::Internal, login::Login, message::app::SendError}; #[cfg(test)] mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) - .route("/api/channels/:channel/messages", get(messages)) -} - -#[derive(Default, serde::Deserialize)] -struct ResumeQuery { - resume_point: Option<Sequence>, -} - -async fn list( - State(app): State<App>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Channels, Internal> { - let channels = app.channels().all(query.resume_point).await?; - let response = Channels(channels); - - Ok(response) -} - -struct Channels(Vec<Channel>); - -impl IntoResponse for Channels { - fn into_response(self) -> Response { - let Self(channels) = self; - Json(channels).into_response() - } } #[derive(Clone, serde::Deserialize)] @@ -150,53 +115,3 @@ impl IntoResponse for ErrorResponse { } } } - -async fn messages( - State(app): State<App>, - Path(channel): Path<Id>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Messages, ErrorResponse> { - let messages = app - .channels() - .messages(&channel, query.resume_point) - .await?; - let response = Messages( - messages - .into_iter() - .map(|message| MessageView { - sent: message.sent, - sender: message.sender, - message: MessageInner { - id: message.id, - body: message.body, - }, - }) - .collect(), - ); - - Ok(response) -} - -struct Messages(Vec<MessageView>); - -#[derive(serde::Serialize)] -struct MessageView { - #[serde(flatten)] - sent: Instant, - sender: Login, - message: MessageInner, -} - -#[derive(serde::Serialize)] -struct MessageInner { - id: message::Id, - body: String, -} - -impl IntoResponse for Messages { - fn into_response(self) -> Response { - let Self(messages) = self; - Json(messages).into_response() - } -} diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs deleted file mode 100644 index f15a53c..0000000 --- a/src/channel/routes/test/list.rs +++ /dev/null @@ -1,65 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; - -use crate::{channel::routes, test::fixtures}; - -#[tokio::test] -async fn empty_list() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.is_empty()); -} - -#[tokio::test] -async fn one_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.contains(&channel)); -} - -#[tokio::test] -async fn multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channels = vec![ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - // Call the endpoint - - let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels - .into_iter() - .all(|channel| response_channels.contains(&channel))); -} diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs index ab663eb..3e5aa17 100644 --- a/src/channel/routes/test/mod.rs +++ b/src/channel/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod list; mod on_create; mod on_send; diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5733c9e..ed49017 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -33,8 +33,11 @@ async fn new_channel() { // Verify the semantics - let channels = app.channels().all(None).await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot + .channels + .iter() + .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); let mut events = app .events() @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login, message, ui}; +use crate::{app::App, boot, channel, clock, db, event, expire, login, message, ui}; /// Command-line entry point for running the `hi` server. /// @@ -110,6 +110,7 @@ impl Args { fn routers() -> Router<App> { [ + boot::router(), channel::router(), event::router(), login::router(), diff --git a/src/event/app.rs b/src/event/app.rs index d664ec7..141037d 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, message::{self, repo::Provider as _}, @@ -24,8 +24,9 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: Option<Sequence>, + resume_at: impl Into<ResumePoint>, ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -65,7 +66,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 1349fe6..e748d66 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -12,6 +12,8 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; +pub type ResumePoint = Option<Sequence>; + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] diff --git a/src/event/routes.rs b/src/event/routes.rs index 5b9c7e3..de6d248 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Sequence, Sequenced as _}, + event::{ResumePoint, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -27,7 +27,7 @@ pub fn router() -> Router<App> { #[derive(Default, serde::Deserialize)] struct EventsQuery { - resume_point: Option<Sequence>, + resume_point: ResumePoint, } async fn events( diff --git a/src/event/sequence.rs b/src/event/sequence.rs index fbe3711..ceb5bcb 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,6 @@ use std::fmt; +use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -39,14 +40,14 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) } - pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { @@ -3,6 +3,7 @@ #![warn(clippy::pedantic)] mod app; +mod boot; mod broadcast; mod channel; pub mod cli; diff --git a/src/login/app.rs b/src/login/app.rs index 15adb31..4f60b89 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,8 +1,5 @@ use sqlx::sqlite::SqlitePool; -use crate::event::{repo::Provider as _, Sequence}; - -#[cfg(test)] use super::{repo::Provider as _, Login, Password}; pub struct Logins<'a> { @@ -14,15 +11,6 @@ impl<'a> Logins<'a> { Self { db } } - pub async fn boot_point(&self) -> Result<Sequence, sqlx::Error> { - let mut tx = self.db.begin().await?; - let sequence = tx.sequence().current().await?; - tx.commit().await?; - - Ok(sequence) - } - - #[cfg(test)] pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> { let password_hash = password.hash()?; @@ -34,7 +22,6 @@ impl<'a> Logins<'a> { } } -#[cfg(test)] #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum CreateError { diff --git a/src/login/mod.rs b/src/login/mod.rs index 65e3ada..f272f80 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,3 +1,4 @@ +#[cfg(test)] pub mod app; pub mod extract; mod id; diff --git a/src/login/routes.rs b/src/login/routes.rs index 0874cc3..6579ae6 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Json, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::post, Router, }; @@ -10,7 +10,7 @@ use crate::{ app::App, clock::RequestedAt, error::{Internal, Unauthorized}, - login::{Login, Password}, + login::Password, token::{app, extract::IdentityToken}, }; @@ -19,31 +19,10 @@ mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/boot", get(boot)) .route("/api/auth/login", post(on_login)) .route("/api/auth/logout", post(on_logout)) } -async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> { - let resume_point = app.logins().boot_point().await?; - Ok(Boot { - login, - resume_point: resume_point.to_string(), - }) -} - -#[derive(serde::Serialize)] -struct Boot { - login: Login, - resume_point: String, -} - -impl IntoResponse for Boot { - fn into_response(self) -> Response { - Json(self).into_response() - } -} - #[derive(serde::Deserialize)] struct LoginRequest { name: String, diff --git a/src/login/routes/test/mod.rs b/src/login/routes/test/mod.rs index 7693755..90522c4 100644 --- a/src/login/routes/test/mod.rs +++ b/src/login/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod boot; mod login; mod logout; diff --git a/src/message/history.rs b/src/message/history.rs index c44d954..f267f4c 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,6 +24,12 @@ impl History { pub fn as_sent(&self) -> Message { self.message.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Events interface diff --git a/src/message/repo.rs b/src/message/repo.rs index 2ca409d..5b199a7 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, History, Id}; use crate::{ channel::{self, Channel}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, login::{self, Login}, }; @@ -69,7 +69,7 @@ impl<'c> Messages<'c> { pub async fn in_channel( &mut self, channel: &channel::History, - resume_at: Option<Sequence>, + resume_at: ResumePoint, ) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( @@ -203,10 +203,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let messages = sqlx::query!( r#" select |
