From d171a258ad2119e39cb715f8800031fff16967dc Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 1 Oct 2024 22:43:18 -0400 Subject: Provide a resume point to bridge clients from state snapshots to the event sequence. --- docs/api.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/docs/api.md b/docs/api.md index e18c6d5..5adf28d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -23,7 +23,8 @@ Returns information needed to boot the client. Also the recommended way to check "login": { "name": "example username", "id": "L1234abcd", - } + }, + "resume_point": "1312", } ``` @@ -80,6 +81,10 @@ Channels are the containers for conversations. The API supports listing channels Lists channels. +#### Query parameters + +This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to channels as they existed at a fixed point in time, with any later changes only appearing in the event stream. + #### On success Responds with a list of channel objects, one per channel: @@ -152,9 +157,13 @@ Subscribes to events. This endpoint returns an `application/event-stream` respon The returned stream may terminate, to limit the number of outstanding messages held by the server. Clients can and should repeat the request, using the `Last-Event-Id` header to resume from where they left off. Events will be replayed from that point, and the stream will resume. +#### Query parameters + +This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter start the returned stream immediately after the `resume_point`. + #### Request headers -This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` field sent with the last event the client has processed. When `Last-Event-Id` is sent, the response will resume immediately after the corresponding event. If this header is omitted, then the stream will start from the beginning. +This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` field sent with the last event the client has processed. When `Last-Event-Id` is sent, the response will resume immediately after the corresponding event. This header takes precedence over the `resume_point` query parameter; if neither is provided, then event playback starts at the beginning of time (_you have been warned_). If you're using a browser's `EventSource` API, this is handled for you automatically. -- cgit v1.2.3 From 0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 3 Oct 2024 20:17:07 -0400 Subject: Add endpoints for deleting channels and messages. It is deliberate that the expire() functions do not use them. To avoid races, the transactions must be committed before events get sent, in both cases, which makes them structurally pretty different. --- ...4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 ++++++ ...0b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json | 32 ---------- ...9cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json | 62 +++++++++++++++++++ ...99e837106c799e84015425286b79f42e4001d8a4c7.json | 62 ------------------- ...ad2d2dec42949522f182a61bfb249f13ee78564179.json | 20 ++++++ docs/api.md | 28 +++++++++ src/channel/app.rs | 72 ++++++++++++++++++---- src/channel/routes.rs | 61 ++++++++++++------ src/channel/routes/test/on_send.rs | 6 +- src/cli.rs | 13 ++-- src/event/app.rs | 1 + src/event/broadcaster.rs | 2 +- src/message/app.rs | 61 +++++++++++++----- src/message/mod.rs | 3 +- src/message/repo.rs | 46 +++++++------- src/message/routes.rs | 46 ++++++++++++++ 16 files changed, 363 insertions(+), 172 deletions(-) create mode 100644 .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json delete mode 100644 .sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json create mode 100644 .sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json delete mode 100644 .sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json create mode 100644 .sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json create mode 100644 src/message/routes.rs (limited to 'docs') diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json new file mode 100644 index 0000000..ee0f235 --- /dev/null +++ b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" +} diff --git a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json b/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json deleted file mode 100644 index fb5f94b..0000000 --- a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n message.id as \"message: Id\"\n from message\n join channel on message.channel = channel.id\n where sent_at < $1\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "message: Id", - "ordinal": 2, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc" -} diff --git a/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json new file mode 100644 index 0000000..257e1f6 --- /dev/null +++ b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5" +} diff --git a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json b/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json deleted file mode 100644 index 288a657..0000000 --- a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n and message.channel = $2\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender_id: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7" -} diff --git a/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json new file mode 100644 index 0000000..92a64a3 --- /dev/null +++ b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"message: Id\"\n from message\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "message: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179" +} diff --git a/docs/api.md b/docs/api.md index 5adf28d..ef211bc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -151,6 +151,34 @@ Once the message is accepted, this will return a 202 Accepted response. The mess If the channel ID is not valid, this will return a 404 Not Found response. +### `DELETE /api/channels/:channel` + +Deletes a channel (and all messages in it). + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### On success + +This will return a 202 Accepted response on success, and delete the channel. + +#### Invalid channel ID + +If the channel ID is not valid, this will return a 404 Not Found response. + +### `DELETE /api/messages/:message` + +Deletes a message. + +The `:message` placeholder must be a message ID, as returned from the event stream or from a list of messages. + +#### On success + +This will return a 202 Accepted response on success, and delete the message. + +#### Invalid message ID + +If the message ID is not valid, this will return a 404 Not Found response. + ### `GET /api/events` Subscribes to events. This endpoint returns an `application/event-stream` response, and is intended for use with the `EventSource` browser API. Events will be delivered on this stream as they occur, and the request will remain open to deliver events. diff --git a/src/channel/app.rs b/src/channel/app.rs index 6ce826b..24be2ff 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,10 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ - channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -28,9 +30,8 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - for event in channel.events() { - self.events.broadcast(event); - } + self.events + .broadcast(channel.events().map(Event::from).collect::>()); Ok(channel.snapshot()) } @@ -53,6 +54,46 @@ impl<'a> Channels<'a> { Ok(channels) } + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel).await?; + for message in messages { + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -73,12 +114,13 @@ impl<'a> Channels<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.sequence() < b.sequence()) + .map(Event::from) + .collect::>(), + ); Ok(()) } @@ -92,6 +134,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5bb1ee9..bce634e 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,18 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; use axum_extra::extract::Query; -use super::app; +use super::{ + app::{self, DeleteError}, + Channel, Id, +}; use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - error::Internal, - event::Sequence, - login::Login, - message::app::Error as MessageError, + app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, + message::app::SendError, }; #[cfg(test)] @@ -26,6 +24,7 @@ pub fn router() -> Router { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) } #[derive(Default, serde::Deserialize)] @@ -95,28 +94,54 @@ struct SendRequest { async fn on_send( State(app): State, - Path(channel): Path, + Path(channel): Path, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json, -) -> Result { +) -> Result { app.messages() .send(&channel, &login, &sent_at, &request.message) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + .await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(MessageError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State, + Path(channel): Path, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct DeleteErrorResponse(#[from] DeleteError); -impl IntoResponse for ErrorResponse { +impl IntoResponse for DeleteErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ MessageError::ChannelNotFound(_) => { + not_found @ DeleteError::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 1027b29..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -5,7 +5,7 @@ use crate::{ channel, channel::routes, event, - message::app, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -77,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -91,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::Error::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } diff --git a/src/cli.rs b/src/cli.rs index 893fae2..2d9f512 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login}; +use crate::{app::App, channel, clock, db, event, expire, login, message}; /// Command-line entry point for running the `hi` server. /// @@ -105,9 +105,14 @@ impl Args { } fn routers() -> Router { - [channel::router(), event::router(), login::router()] - .into_iter() - .fold(Router::default(), Router::merge) + [ + channel::router(), + event::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) } fn started_msg(listener: &net::TcpListener) -> io::Result { diff --git a/src/event/app.rs b/src/event/app.rs index e58bea9..32f0a97 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -61,6 +61,7 @@ impl<'a> Events<'a> { // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. + .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index de2513a..3c4efac 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster; +pub type Broadcaster = broadcast::Broadcaster>; diff --git a/src/message/app.rs b/src/message/app.rs index 51f772e..1d34c14 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,12 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Message}; +use super::{repo::Provider as _, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, login::Login, }; @@ -27,13 +27,13 @@ impl<'a> Messages<'a> { sender: &Login, sent_at: &DateTime, body: &str, - ) -> Result { + ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::ChannelNotFound(channel.clone()))?; + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; let sent = tx.sequence().next(sent_at).await?; let message = tx .messages() @@ -41,24 +41,40 @@ impl<'a> Messages<'a> { .await?; tx.commit().await?; - for event in message.events() { - self.events.broadcast(event); - } + self.events + .broadcast(message.events().map(Event::from).collect::>()); Ok(message.snapshot()) } + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::>(), + ); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let expired = tx.messages().expired(&expire_at).await?; + let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { + for message in expired { let deleted = tx.sequence().next(relative_to).await?; - let message = tx.messages().delete(&channel, &message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() @@ -68,21 +84,32 @@ impl<'a> Messages<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .map(Event::from) + .collect::>(), + ); Ok(()) } } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } diff --git a/src/message/mod.rs b/src/message/mod.rs index 52d56c1..a8f51ab 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -3,6 +3,7 @@ pub mod event; mod history; mod id; pub mod repo; +mod routes; mod snapshot; -pub use self::{event::Event, history::History, id::Id, snapshot::Message}; +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs index 3b2b8f7..ae41736 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -62,7 +62,25 @@ impl<'c> Messages<'c> { Ok(message) } - async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result { + pub async fn in_channel(&mut self, channel: &Channel) -> Result, sqlx::Error> { + let messages = sqlx::query_scalar!( + r#" + select + message.id as "id: Id" + from message + join channel on message.channel = channel.id + where channel.id = $1 + order by message.sent_sequence + "#, + channel.id, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select @@ -78,10 +96,8 @@ impl<'c> Messages<'c> { join channel on message.channel = channel.id join login as sender on message.sender = sender.id where message.id = $1 - and message.channel = $2 "#, message, - channel.id, ) .map(|row| History { message: Message { @@ -110,11 +126,10 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - channel: &Channel, message: &Id, deleted: &Instant, ) -> Result { - let history = self.by_id(channel, message).await?; + let history = self.by_id(message).await?; sqlx::query_scalar!( r#" @@ -134,31 +149,16 @@ impl<'c> Messages<'c> { }) } - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result, sqlx::Error> { - let messages = sqlx::query!( + pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { + let messages = sqlx::query_scalar!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - message.id as "message: Id" + id as "message: Id" from message - join channel on message.channel = channel.id where sent_at < $1 "#, expire_at, ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - }, - row.message, - ) - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes.rs b/src/message/routes.rs new file mode 100644 index 0000000..29fe3d7 --- /dev/null +++ b/src/message/routes.rs @@ -0,0 +1,46 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::delete, + Router, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::Login, + message::{self, app::DeleteError}, +}; + +pub fn router() -> Router { + Router::new().route("/api/messages/:message", delete(on_delete)) +} + +async fn on_delete( + State(app): State, + Path(message): Path, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] DeleteError); + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} -- cgit v1.2.3 From 617172576b95bbb935a75f98a98787da5a4e9a9d Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 3 Oct 2024 20:44:07 -0400 Subject: List messages per channel. --- ...4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 ------ ...4a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json | 62 ++++++++++++++++++ docs/api.md | 30 +++++++++ src/channel/app.rs | 44 +++++++++++-- src/channel/repo.rs | 7 +- src/channel/routes.rs | 76 ++++++++++++++++++---- src/event/app.rs | 8 +-- src/event/mod.rs | 2 +- src/event/sequence.rs | 7 ++ src/message/app.rs | 4 +- src/message/event.rs | 27 +++++++- src/message/history.rs | 4 +- src/message/repo.rs | 57 ++++++++++++---- src/message/snapshot.rs | 4 +- 14 files changed, 281 insertions(+), 71 deletions(-) delete mode 100644 .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json create mode 100644 .sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json (limited to 'docs') diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json deleted file mode 100644 index ee0f235..0000000 --- a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false - ] - }, - "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" -} diff --git a/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json new file mode 100644 index 0000000..82246ac --- /dev/null +++ b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where channel.id = $1\n and coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca" +} diff --git a/docs/api.md b/docs/api.md index ef211bc..e8c8c8c 100644 --- a/docs/api.md +++ b/docs/api.md @@ -127,6 +127,36 @@ Channel names must be unique. If a channel with the same name already exists, th The API delivers events to clients to update them on other clients' actions and messages. While there is no specific delivery deadline, messages are delivered as soon as possible on a best-effort basis, and the event system allows clients to replay events or resume interrupted streams, to allow recovery if a message is lost. +### `GET /api/channels/:channel/messages` + +Retrieves historical messages in a channel. + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### Query parameters + +This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to messages as they existed at a fixed point in time, with any later changes only appearing in the event stream. + +#### On success + +Responds with a list of message objects, one per message: + +```json +[ + { + "at": "2024-09-27T23:19:10.208147Z", + "sender": { + "id": "L1234abcd", + "name": "example username" + }, + "message": { + "id": "M1312acab", + "body": "beep" + } + } +] +``` + ### `POST /api/channels/:channel` Sends a chat message to a channel. It will be relayed to clients subscribed to the channel's events, and recorded for replay. diff --git a/src/channel/app.rs b/src/channel/app.rs index 24be2ff..b3bfbee 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, - message::repo::Provider as _, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -54,22 +54,52 @@ impl<'a> Channels<'a> { Ok(channels) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn messages( + &self, + channel: &Id, + resume_point: Option, + ) -> Result, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| DeleteError::NotFound(channel.clone()))? + .not_found(|| Error::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel).await?; + let messages = tx.messages().in_channel(&channel, None).await?; for message in messages { + let message = message.snapshot(); let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; events.extend( message .events() @@ -117,7 +147,7 @@ impl<'a> Channels<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.sequence() < b.sequence()) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); @@ -135,7 +165,7 @@ pub enum CreateError { } #[derive(Debug, thiserror::Error)] -pub enum DeleteError { +pub enum Error { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 8bb761b..2b48436 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -84,10 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all( - &mut self, - resume_point: Option, - ) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Option) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -99,7 +96,7 @@ impl<'c> Channels<'c> { where coalesce(created_sequence <= $1, true) order by channel.name "#, - resume_point, + resume_at, ) .map(|row| History { channel: Channel { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index bce634e..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,13 +7,14 @@ use axum::{ }; use axum_extra::extract::Query; -use super::{ - app::{self, DeleteError}, - Channel, Id, -}; +use super::{app, Channel, Id}; use crate::{ - app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, - message::app::SendError, + app::App, + clock::RequestedAt, + error::Internal, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -25,17 +26,18 @@ pub fn router() -> Router { .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } #[derive(Default, serde::Deserialize)] -struct ListQuery { +struct ResumeQuery { resume_point: Option, } async fn list( State(app): State, _: Login, - Query(query): Query, + Query(query): Query, ) -> Result { let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); @@ -127,7 +129,7 @@ async fn on_delete( Path(channel): Path, RequestedAt(deleted_at): RequestedAt, _: Login, -) -> Result { +) -> Result { app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) @@ -135,16 +137,66 @@ async fn on_delete( #[derive(Debug, thiserror::Error)] #[error(transparent)] -struct DeleteErrorResponse(#[from] DeleteError); +struct ErrorResponse(#[from] app::Error); -impl IntoResponse for DeleteErrorResponse { +impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ DeleteError::NotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State, + Path(channel): Path, + _: Login, + Query(query): Query, +) -> Result { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 32f0a97..d664ec7 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -36,7 +36,7 @@ impl<'a> Events<'a> { let channel_events = channels .iter() .map(channel::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -44,14 +44,12 @@ impl<'a> Events<'a> { let message_events = messages .iter() .map(message::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = channel_events - .merge_by(message_events, |a, b| { - a.instant.sequence < b.instant.sequence - }) + .merge_by(message_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index 1503b77..1349fe6 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -38,7 +38,7 @@ impl From for Event { impl From for Event { fn from(event: message::Event) -> Self { Self { - instant: event.instant, + instant: event.instant(), kind: event.kind.into(), } } diff --git a/src/event/sequence.rs b/src/event/sequence.rs index c566156..fbe3711 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -59,6 +59,13 @@ impl Sequence { { move |event| resume_point <= event.sequence() } + + pub fn merge(a: &E, b: &E) -> bool + where + E: Sequenced, + { + a.sequence() < b.sequence() + } } pub trait Sequenced { diff --git a/src/message/app.rs b/src/message/app.rs index 1d34c14..fd6a334 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, login::Login, }; @@ -87,7 +87,7 @@ impl<'a> Messages<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); diff --git a/src/message/event.rs b/src/message/event.rs index bcc2238..66db9b0 100644 --- a/src/message/event.rs +++ b/src/message/event.rs @@ -6,15 +6,13 @@ use crate::{ #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { - #[serde(flatten)] - pub instant: Instant, #[serde(flatten)] pub kind: Kind, } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant + self.kind.instant() } } @@ -25,12 +23,27 @@ pub enum Kind { Deleted(Deleted), } +impl Sequenced for Kind { + fn instant(&self) -> Instant { + match self { + Self::Sent(sent) => sent.instant(), + Self::Deleted(deleted) => deleted.instant(), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Sent { #[serde(flatten)] pub message: Message, } +impl Sequenced for Sent { + fn instant(&self) -> Instant { + self.message.sent + } +} + impl From for Kind { fn from(event: Sent) -> Self { Self::Sent(event) @@ -39,10 +52,18 @@ impl From for Kind { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, pub channel: Channel, pub message: Id, } +impl Sequenced for Deleted { + fn instant(&self) -> Instant { + self.instant + } +} + impl From for Kind { fn from(event: Deleted) -> Self { Self::Deleted(event) diff --git a/src/message/history.rs b/src/message/history.rs index 5aca47e..89fc6b1 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -7,14 +7,12 @@ use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, - pub sent: Instant, pub deleted: Option, } impl History { fn sent(&self) -> Event { Event { - instant: self.sent, kind: Sent { message: self.message.clone(), } @@ -24,8 +22,8 @@ impl History { fn deleted(&self) -> Option { self.deleted.map(|instant| Event { - instant, kind: Deleted { + instant, channel: self.message.channel.clone(), message: self.message.id.clone(), } diff --git a/src/message/repo.rs b/src/message/repo.rs index ae41736..fc835c8 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -48,12 +48,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: *sent, channel: channel.clone(), sender: sender.clone(), id: row.id, body: row.body, }, - sent: *sent, deleted: None, }) .fetch_one(&mut *self.0) @@ -62,18 +62,51 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel(&mut self, channel: &Channel) -> Result, sqlx::Error> { - let messages = sqlx::query_scalar!( + pub async fn in_channel( + &mut self, + channel: &Channel, + resume_at: Option, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( r#" select - message.id as "id: Id" + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where channel.id = $1 + and coalesce(message.sent_sequence <= $2, true) order by message.sent_sequence "#, channel.id, + resume_at, ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) .fetch_all(&mut *self.0) .await?; @@ -101,6 +134,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -112,10 +149,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_one(&mut *self.0) @@ -189,6 +222,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -200,10 +237,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_all(&mut *self.0) diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 3adccbe..522c1aa 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,11 +2,13 @@ use super::{ event::{Event, Kind, Sent}, Id, }; -use crate::{channel::Channel, login::Login}; +use crate::{channel::Channel, event::Instant, login::Login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(into = "self::serialize::Message")] pub struct Message { + #[serde(skip)] + pub sent: Instant, pub channel: Channel, pub sender: Login, pub id: Id, -- cgit v1.2.3