diff options
| -rw-r--r-- | .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 | ||||
| -rw-r--r-- | .sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json | 62 | ||||
| -rw-r--r-- | docs/api.md | 30 | ||||
| -rw-r--r-- | src/channel/app.rs | 44 | ||||
| -rw-r--r-- | src/channel/repo.rs | 7 | ||||
| -rw-r--r-- | src/channel/routes.rs | 76 | ||||
| -rw-r--r-- | src/event/app.rs | 8 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/sequence.rs | 7 | ||||
| -rw-r--r-- | src/message/app.rs | 4 | ||||
| -rw-r--r-- | src/message/event.rs | 27 | ||||
| -rw-r--r-- | src/message/history.rs | 4 | ||||
| -rw-r--r-- | src/message/repo.rs | 57 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 4 |
14 files changed, 281 insertions, 71 deletions
diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json deleted file mode 100644 index ee0f235..0000000 --- a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false - ] - }, - "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" -} diff --git a/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json new file mode 100644 index 0000000..82246ac --- /dev/null +++ b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where channel.id = $1\n and coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca" +} diff --git a/docs/api.md b/docs/api.md index ef211bc..e8c8c8c 100644 --- a/docs/api.md +++ b/docs/api.md @@ -127,6 +127,36 @@ Channel names must be unique. If a channel with the same name already exists, th The API delivers events to clients to update them on other clients' actions and messages. While there is no specific delivery deadline, messages are delivered as soon as possible on a best-effort basis, and the event system allows clients to replay events or resume interrupted streams, to allow recovery if a message is lost. +### `GET /api/channels/:channel/messages` + +Retrieves historical messages in a channel. + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### Query parameters + +This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to messages as they existed at a fixed point in time, with any later changes only appearing in the event stream. + +#### On success + +Responds with a list of message objects, one per message: + +```json +[ + { + "at": "2024-09-27T23:19:10.208147Z", + "sender": { + "id": "L1234abcd", + "name": "example username" + }, + "message": { + "id": "M1312acab", + "body": "beep" + } + } +] +``` + ### `POST /api/channels/:channel` Sends a chat message to a channel. It will be relayed to clients subscribed to the channel's events, and recorded for replay. diff --git a/src/channel/app.rs b/src/channel/app.rs index 24be2ff..b3bfbee 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, - message::repo::Provider as _, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -54,22 +54,52 @@ impl<'a> Channels<'a> { Ok(channels) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn messages( + &self, + channel: &Id, + resume_point: Option<Sequence>, + ) -> Result<Vec<Message>, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| DeleteError::NotFound(channel.clone()))? + .not_found(|| Error::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel).await?; + let messages = tx.messages().in_channel(&channel, None).await?; for message in messages { + let message = message.snapshot(); let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; events.extend( message .events() @@ -117,7 +147,7 @@ impl<'a> Channels<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.sequence() < b.sequence()) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::<Vec<_>>(), ); @@ -135,7 +165,7 @@ pub enum CreateError { } #[derive(Debug, thiserror::Error)] -pub enum DeleteError { +pub enum Error { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 8bb761b..2b48436 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -84,10 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all( - &mut self, - resume_point: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -99,7 +96,7 @@ impl<'c> Channels<'c> { where coalesce(created_sequence <= $1, true) order by channel.name "#, - resume_point, + resume_at, ) .map(|row| History { channel: Channel { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index bce634e..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,13 +7,14 @@ use axum::{ }; use axum_extra::extract::Query; -use super::{ - app::{self, DeleteError}, - Channel, Id, -}; +use super::{app, Channel, Id}; use crate::{ - app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, - message::app::SendError, + app::App, + clock::RequestedAt, + error::Internal, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -25,17 +26,18 @@ pub fn router() -> Router<App> { .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } #[derive(Default, serde::Deserialize)] -struct ListQuery { +struct ResumeQuery { resume_point: Option<Sequence>, } async fn list( State(app): State<App>, _: Login, - Query(query): Query<ListQuery>, + Query(query): Query<ResumeQuery>, ) -> Result<Channels, Internal> { let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); @@ -127,7 +129,7 @@ async fn on_delete( Path(channel): Path<Id>, RequestedAt(deleted_at): RequestedAt, _: Login, -) -> Result<StatusCode, DeleteErrorResponse> { +) -> Result<StatusCode, ErrorResponse> { app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) @@ -135,16 +137,66 @@ async fn on_delete( #[derive(Debug, thiserror::Error)] #[error(transparent)] -struct DeleteErrorResponse(#[from] DeleteError); +struct ErrorResponse(#[from] app::Error); -impl IntoResponse for DeleteErrorResponse { +impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ DeleteError::NotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State<App>, + Path(channel): Path<Id>, + _: Login, + Query(query): Query<ResumeQuery>, +) -> Result<Messages, ErrorResponse> { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec<MessageView>); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 32f0a97..d664ec7 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -36,7 +36,7 @@ impl<'a> Events<'a> { let channel_events = channels .iter() .map(channel::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -44,14 +44,12 @@ impl<'a> Events<'a> { let message_events = messages .iter() .map(message::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = channel_events - .merge_by(message_events, |a, b| { - a.instant.sequence < b.instant.sequence - }) + .merge_by(message_events, Sequence::merge) .collect::<Vec<_>>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index 1503b77..1349fe6 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -38,7 +38,7 @@ impl From<channel::Event> for Event { impl From<message::Event> for Event { fn from(event: message::Event) -> Self { Self { - instant: event.instant, + instant: event.instant(), kind: event.kind.into(), } } diff --git a/src/event/sequence.rs b/src/event/sequence.rs index c566156..fbe3711 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -59,6 +59,13 @@ impl Sequence { { move |event| resume_point <= event.sequence() } + + pub fn merge<E>(a: &E, b: &E) -> bool + where + E: Sequenced, + { + a.sequence() < b.sequence() + } } pub trait Sequenced { diff --git a/src/message/app.rs b/src/message/app.rs index 1d34c14..fd6a334 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, login::Login, }; @@ -87,7 +87,7 @@ impl<'a> Messages<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::<Vec<_>>(), ); diff --git a/src/message/event.rs b/src/message/event.rs index bcc2238..66db9b0 100644 --- a/src/message/event.rs +++ b/src/message/event.rs @@ -7,14 +7,12 @@ use crate::{ #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] pub kind: Kind, } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant + self.kind.instant() } } @@ -25,12 +23,27 @@ pub enum Kind { Deleted(Deleted), } +impl Sequenced for Kind { + fn instant(&self) -> Instant { + match self { + Self::Sent(sent) => sent.instant(), + Self::Deleted(deleted) => deleted.instant(), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Sent { #[serde(flatten)] pub message: Message, } +impl Sequenced for Sent { + fn instant(&self) -> Instant { + self.message.sent + } +} + impl From<Sent> for Kind { fn from(event: Sent) -> Self { Self::Sent(event) @@ -39,10 +52,18 @@ impl From<Sent> for Kind { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, pub channel: Channel, pub message: Id, } +impl Sequenced for Deleted { + fn instant(&self) -> Instant { + self.instant + } +} + impl From<Deleted> for Kind { fn from(event: Deleted) -> Self { Self::Deleted(event) diff --git a/src/message/history.rs b/src/message/history.rs index 5aca47e..89fc6b1 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -7,14 +7,12 @@ use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, - pub sent: Instant, pub deleted: Option<Instant>, } impl History { fn sent(&self) -> Event { Event { - instant: self.sent, kind: Sent { message: self.message.clone(), } @@ -24,8 +22,8 @@ impl History { fn deleted(&self) -> Option<Event> { self.deleted.map(|instant| Event { - instant, kind: Deleted { + instant, channel: self.message.channel.clone(), message: self.message.id.clone(), } diff --git a/src/message/repo.rs b/src/message/repo.rs index ae41736..fc835c8 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -48,12 +48,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: *sent, channel: channel.clone(), sender: sender.clone(), id: row.id, body: row.body, }, - sent: *sent, deleted: None, }) .fetch_one(&mut *self.0) @@ -62,18 +62,51 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> { - let messages = sqlx::query_scalar!( + pub async fn in_channel( + &mut self, + channel: &Channel, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( r#" select - message.id as "id: Id" + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where channel.id = $1 + and coalesce(message.sent_sequence <= $2, true) order by message.sent_sequence "#, channel.id, + resume_at, ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) .fetch_all(&mut *self.0) .await?; @@ -101,6 +134,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -112,10 +149,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_one(&mut *self.0) @@ -189,6 +222,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -200,10 +237,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_all(&mut *self.0) diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 3adccbe..522c1aa 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,11 +2,13 @@ use super::{ event::{Event, Kind, Sent}, Id, }; -use crate::{channel::Channel, login::Login}; +use crate::{channel::Channel, event::Instant, login::Login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(into = "self::serialize::Message")] pub struct Message { + #[serde(skip)] + pub sent: Instant, pub channel: Channel, pub sender: Login, pub id: Id, |
