diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 20:17:07 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 20:17:07 -0400 |
| commit | 0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (patch) | |
| tree | f7ce69ad18768ff53d8fa37d8eb9c6c575633f9e /src/channel | |
| parent | ec804134c33aedb001c426c5f42f43f53c47848f (diff) | |
Add endpoints for deleting channels and messages.
It is deliberate that the expire() functions do not use them. To avoid races, the transactions must be committed before events get sent, in both cases, which makes them structurally pretty different.
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 72 | ||||
| -rw-r--r-- | src/channel/routes.rs | 61 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 6 |
3 files changed, 107 insertions, 32 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 6ce826b..24be2ff 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,10 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ - channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -28,9 +30,8 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - for event in channel.events() { - self.events.broadcast(event); - } + self.events + .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); Ok(channel.snapshot()) } @@ -53,6 +54,46 @@ impl<'a> Channels<'a> { Ok(channels) } + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel).await?; + for message in messages { + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -73,12 +114,13 @@ impl<'a> Channels<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.sequence() < b.sequence()) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } @@ -92,6 +134,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5bb1ee9..bce634e 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,18 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; use axum_extra::extract::Query; -use super::app; +use super::{ + app::{self, DeleteError}, + Channel, Id, +}; use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - error::Internal, - event::Sequence, - login::Login, - message::app::Error as MessageError, + app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, + message::app::SendError, }; #[cfg(test)] @@ -26,6 +24,7 @@ pub fn router() -> Router<App> { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) } #[derive(Default, serde::Deserialize)] @@ -95,28 +94,54 @@ struct SendRequest { async fn on_send( State(app): State<App>, - Path(channel): Path<channel::Id>, + Path(channel): Path<Id>, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json<SendRequest>, -) -> Result<StatusCode, ErrorResponse> { +) -> Result<StatusCode, SendErrorResponse> { app.messages() .send(&channel, &login, &sent_at, &request.message) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + .await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(MessageError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State<App>, + Path(channel): Path<Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, DeleteErrorResponse> { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct DeleteErrorResponse(#[from] DeleteError); -impl IntoResponse for ErrorResponse { +impl IntoResponse for DeleteErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ MessageError::ChannelNotFound(_) => { + not_found @ DeleteError::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 1027b29..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -5,7 +5,7 @@ use crate::{ channel, channel::routes, event, - message::app, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -77,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -91,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::Error::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } |
