diff options
| -rw-r--r-- | .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 | ||||
| -rw-r--r-- | .sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json | 32 | ||||
| -rw-r--r-- | .sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json (renamed from .sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json) | 6 | ||||
| -rw-r--r-- | .sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json | 20 | ||||
| -rw-r--r-- | docs/api.md | 28 | ||||
| -rw-r--r-- | src/channel/app.rs | 72 | ||||
| -rw-r--r-- | src/channel/routes.rs | 61 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 6 | ||||
| -rw-r--r-- | src/cli.rs | 13 | ||||
| -rw-r--r-- | src/event/app.rs | 1 | ||||
| -rw-r--r-- | src/event/broadcaster.rs | 2 | ||||
| -rw-r--r-- | src/message/app.rs | 61 | ||||
| -rw-r--r-- | src/message/mod.rs | 3 | ||||
| -rw-r--r-- | src/message/repo.rs | 46 | ||||
| -rw-r--r-- | src/message/routes.rs | 46 |
15 files changed, 304 insertions, 113 deletions
diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json new file mode 100644 index 0000000..ee0f235 --- /dev/null +++ b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" +} diff --git a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json b/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json deleted file mode 100644 index fb5f94b..0000000 --- a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n message.id as \"message: Id\"\n from message\n join channel on message.channel = channel.id\n where sent_at < $1\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "message: Id", - "ordinal": 2, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc" -} diff --git a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json index 288a657..257e1f6 100644 --- a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json +++ b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n and message.channel = $2\n ", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n ", "describe": { "columns": [ { @@ -45,7 +45,7 @@ } ], "parameters": { - "Right": 2 + "Right": 1 }, "nullable": [ false, @@ -58,5 +58,5 @@ false ] }, - "hash": "e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7" + "hash": "6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5" } diff --git a/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json new file mode 100644 index 0000000..92a64a3 --- /dev/null +++ b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"message: Id\"\n from message\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "message: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179" +} diff --git a/docs/api.md b/docs/api.md index 5adf28d..ef211bc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -151,6 +151,34 @@ Once the message is accepted, this will return a 202 Accepted response. The mess If the channel ID is not valid, this will return a 404 Not Found response. +### `DELETE /api/channels/:channel` + +Deletes a channel (and all messages in it). + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### On success + +This will return a 202 Accepted response on success, and delete the channel. + +#### Invalid channel ID + +If the channel ID is not valid, this will return a 404 Not Found response. + +### `DELETE /api/messages/:message` + +Deletes a message. + +The `:message` placeholder must be a message ID, as returned from the event stream or from a list of messages. + +#### On success + +This will return a 202 Accepted response on success, and delete the message. + +#### Invalid message ID + +If the message ID is not valid, this will return a 404 Not Found response. + ### `GET /api/events` Subscribes to events. This endpoint returns an `application/event-stream` response, and is intended for use with the `EventSource` browser API. Events will be delivered on this stream as they occur, and the request will remain open to deliver events. diff --git a/src/channel/app.rs b/src/channel/app.rs index 6ce826b..24be2ff 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,10 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ - channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -28,9 +30,8 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - for event in channel.events() { - self.events.broadcast(event); - } + self.events + .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); Ok(channel.snapshot()) } @@ -53,6 +54,46 @@ impl<'a> Channels<'a> { Ok(channels) } + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel).await?; + for message in messages { + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -73,12 +114,13 @@ impl<'a> Channels<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.sequence() < b.sequence()) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } @@ -92,6 +134,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5bb1ee9..bce634e 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,18 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; use axum_extra::extract::Query; -use super::app; +use super::{ + app::{self, DeleteError}, + Channel, Id, +}; use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - error::Internal, - event::Sequence, - login::Login, - message::app::Error as MessageError, + app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, + message::app::SendError, }; #[cfg(test)] @@ -26,6 +24,7 @@ pub fn router() -> Router<App> { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) } #[derive(Default, serde::Deserialize)] @@ -95,28 +94,54 @@ struct SendRequest { async fn on_send( State(app): State<App>, - Path(channel): Path<channel::Id>, + Path(channel): Path<Id>, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json<SendRequest>, -) -> Result<StatusCode, ErrorResponse> { +) -> Result<StatusCode, SendErrorResponse> { app.messages() .send(&channel, &login, &sent_at, &request.message) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + .await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(MessageError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State<App>, + Path(channel): Path<Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, DeleteErrorResponse> { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct DeleteErrorResponse(#[from] DeleteError); -impl IntoResponse for ErrorResponse { +impl IntoResponse for DeleteErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ MessageError::ChannelNotFound(_) => { + not_found @ DeleteError::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 1027b29..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -5,7 +5,7 @@ use crate::{ channel, channel::routes, event, - message::app, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -77,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -91,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::Error::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login}; +use crate::{app::App, channel, clock, db, event, expire, login, message}; /// Command-line entry point for running the `hi` server. /// @@ -105,9 +105,14 @@ impl Args { } fn routers() -> Router<App> { - [channel::router(), event::router(), login::router()] - .into_iter() - .fold(Router::default(), Router::merge) + [ + channel::router(), + event::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) } fn started_msg(listener: &net::TcpListener) -> io::Result<String> { diff --git a/src/event/app.rs b/src/event/app.rs index e58bea9..32f0a97 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -61,6 +61,7 @@ impl<'a> Events<'a> { // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. + .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index de2513a..3c4efac 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster<super::Event>; +pub type Broadcaster = broadcast::Broadcaster<Vec<super::Event>>; diff --git a/src/message/app.rs b/src/message/app.rs index 51f772e..1d34c14 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,12 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Message}; +use super::{repo::Provider as _, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, login::Login, }; @@ -27,13 +27,13 @@ impl<'a> Messages<'a> { sender: &Login, sent_at: &DateTime, body: &str, - ) -> Result<Message, Error> { + ) -> Result<Message, SendError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::ChannelNotFound(channel.clone()))?; + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; let sent = tx.sequence().next(sent_at).await?; let message = tx .messages() @@ -41,24 +41,40 @@ impl<'a> Messages<'a> { .await?; tx.commit().await?; - for event in message.events() { - self.events.broadcast(event); - } + self.events + .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); Ok(message.snapshot()) } + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let expired = tx.messages().expired(&expire_at).await?; + let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { + for message in expired { let deleted = tx.sequence().next(relative_to).await?; - let message = tx.messages().delete(&channel, &message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() @@ -68,21 +84,32 @@ impl<'a> Messages<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } diff --git a/src/message/mod.rs b/src/message/mod.rs index 52d56c1..a8f51ab 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -3,6 +3,7 @@ pub mod event; mod history; mod id; pub mod repo; +mod routes; mod snapshot; -pub use self::{event::Event, history::History, id::Id, snapshot::Message}; +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs index 3b2b8f7..ae41736 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -62,7 +62,25 @@ impl<'c> Messages<'c> { Ok(message) } - async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result<History, sqlx::Error> { + pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> { + let messages = sqlx::query_scalar!( + r#" + select + message.id as "id: Id" + from message + join channel on message.channel = channel.id + where channel.id = $1 + order by message.sent_sequence + "#, + channel.id, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" select @@ -78,10 +96,8 @@ impl<'c> Messages<'c> { join channel on message.channel = channel.id join login as sender on message.sender = sender.id where message.id = $1 - and message.channel = $2 "#, message, - channel.id, ) .map(|row| History { message: Message { @@ -110,11 +126,10 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - channel: &Channel, message: &Id, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let history = self.by_id(channel, message).await?; + let history = self.by_id(message).await?; sqlx::query_scalar!( r#" @@ -134,31 +149,16 @@ impl<'c> Messages<'c> { }) } - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result<Vec<(Channel, Id)>, sqlx::Error> { - let messages = sqlx::query!( + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + let messages = sqlx::query_scalar!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - message.id as "message: Id" + id as "message: Id" from message - join channel on message.channel = channel.id where sent_at < $1 "#, expire_at, ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - }, - row.message, - ) - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes.rs b/src/message/routes.rs new file mode 100644 index 0000000..29fe3d7 --- /dev/null +++ b/src/message/routes.rs @@ -0,0 +1,46 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::delete, + Router, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::Login, + message::{self, app::DeleteError}, +}; + +pub fn router() -> Router<App> { + Router::new().route("/api/messages/:message", delete(on_delete)) +} + +async fn on_delete( + State(app): State<App>, + Path(message): Path<message::Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, ErrorResponse> { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] DeleteError); + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} |
