diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 72 | ||||
| -rw-r--r-- | src/channel/routes.rs | 61 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 6 | ||||
| -rw-r--r-- | src/cli.rs | 13 | ||||
| -rw-r--r-- | src/event/app.rs | 1 | ||||
| -rw-r--r-- | src/event/broadcaster.rs | 2 | ||||
| -rw-r--r-- | src/message/app.rs | 61 | ||||
| -rw-r--r-- | src/message/mod.rs | 3 | ||||
| -rw-r--r-- | src/message/repo.rs | 46 | ||||
| -rw-r--r-- | src/message/routes.rs | 46 |
10 files changed, 233 insertions, 78 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 6ce826b..24be2ff 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,10 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ - channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -28,9 +30,8 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - for event in channel.events() { - self.events.broadcast(event); - } + self.events + .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); Ok(channel.snapshot()) } @@ -53,6 +54,46 @@ impl<'a> Channels<'a> { Ok(channels) } + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel).await?; + for message in messages { + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -73,12 +114,13 @@ impl<'a> Channels<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.sequence() < b.sequence()) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } @@ -92,6 +134,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5bb1ee9..bce634e 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,18 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; use axum_extra::extract::Query; -use super::app; +use super::{ + app::{self, DeleteError}, + Channel, Id, +}; use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - error::Internal, - event::Sequence, - login::Login, - message::app::Error as MessageError, + app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, + message::app::SendError, }; #[cfg(test)] @@ -26,6 +24,7 @@ pub fn router() -> Router<App> { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) } #[derive(Default, serde::Deserialize)] @@ -95,28 +94,54 @@ struct SendRequest { async fn on_send( State(app): State<App>, - Path(channel): Path<channel::Id>, + Path(channel): Path<Id>, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json<SendRequest>, -) -> Result<StatusCode, ErrorResponse> { +) -> Result<StatusCode, SendErrorResponse> { app.messages() .send(&channel, &login, &sent_at, &request.message) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + .await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(MessageError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State<App>, + Path(channel): Path<Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, DeleteErrorResponse> { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct DeleteErrorResponse(#[from] DeleteError); -impl IntoResponse for ErrorResponse { +impl IntoResponse for DeleteErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ MessageError::ChannelNotFound(_) => { + not_found @ DeleteError::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 1027b29..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -5,7 +5,7 @@ use crate::{ channel, channel::routes, event, - message::app, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -77,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -91,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::Error::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login}; +use crate::{app::App, channel, clock, db, event, expire, login, message}; /// Command-line entry point for running the `hi` server. /// @@ -105,9 +105,14 @@ impl Args { } fn routers() -> Router<App> { - [channel::router(), event::router(), login::router()] - .into_iter() - .fold(Router::default(), Router::merge) + [ + channel::router(), + event::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) } fn started_msg(listener: &net::TcpListener) -> io::Result<String> { diff --git a/src/event/app.rs b/src/event/app.rs index e58bea9..32f0a97 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -61,6 +61,7 @@ impl<'a> Events<'a> { // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. + .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index de2513a..3c4efac 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster<super::Event>; +pub type Broadcaster = broadcast::Broadcaster<Vec<super::Event>>; diff --git a/src/message/app.rs b/src/message/app.rs index 51f772e..1d34c14 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,12 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Message}; +use super::{repo::Provider as _, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, login::Login, }; @@ -27,13 +27,13 @@ impl<'a> Messages<'a> { sender: &Login, sent_at: &DateTime, body: &str, - ) -> Result<Message, Error> { + ) -> Result<Message, SendError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::ChannelNotFound(channel.clone()))?; + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; let sent = tx.sequence().next(sent_at).await?; let message = tx .messages() @@ -41,24 +41,40 @@ impl<'a> Messages<'a> { .await?; tx.commit().await?; - for event in message.events() { - self.events.broadcast(event); - } + self.events + .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); Ok(message.snapshot()) } + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let expired = tx.messages().expired(&expire_at).await?; + let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { + for message in expired { let deleted = tx.sequence().next(relative_to).await?; - let message = tx.messages().delete(&channel, &message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() @@ -68,21 +84,32 @@ impl<'a> Messages<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } diff --git a/src/message/mod.rs b/src/message/mod.rs index 52d56c1..a8f51ab 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -3,6 +3,7 @@ pub mod event; mod history; mod id; pub mod repo; +mod routes; mod snapshot; -pub use self::{event::Event, history::History, id::Id, snapshot::Message}; +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs index 3b2b8f7..ae41736 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -62,7 +62,25 @@ impl<'c> Messages<'c> { Ok(message) } - async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result<History, sqlx::Error> { + pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> { + let messages = sqlx::query_scalar!( + r#" + select + message.id as "id: Id" + from message + join channel on message.channel = channel.id + where channel.id = $1 + order by message.sent_sequence + "#, + channel.id, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" select @@ -78,10 +96,8 @@ impl<'c> Messages<'c> { join channel on message.channel = channel.id join login as sender on message.sender = sender.id where message.id = $1 - and message.channel = $2 "#, message, - channel.id, ) .map(|row| History { message: Message { @@ -110,11 +126,10 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - channel: &Channel, message: &Id, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let history = self.by_id(channel, message).await?; + let history = self.by_id(message).await?; sqlx::query_scalar!( r#" @@ -134,31 +149,16 @@ impl<'c> Messages<'c> { }) } - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result<Vec<(Channel, Id)>, sqlx::Error> { - let messages = sqlx::query!( + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + let messages = sqlx::query_scalar!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - message.id as "message: Id" + id as "message: Id" from message - join channel on message.channel = channel.id where sent_at < $1 "#, expire_at, ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - }, - row.message, - ) - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes.rs b/src/message/routes.rs new file mode 100644 index 0000000..29fe3d7 --- /dev/null +++ b/src/message/routes.rs @@ -0,0 +1,46 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::delete, + Router, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::Login, + message::{self, app::DeleteError}, +}; + +pub fn router() -> Router<App> { + Router::new().route("/api/messages/:message", delete(on_delete)) +} + +async fn on_delete( + State(app): State<App>, + Path(message): Path<message::Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, ErrorResponse> { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] DeleteError); + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} |
