diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-10-23 21:56:31 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-10-23 21:56:31 -0400 |
| commit | 1f769855df2d9cf2bca883a0475670f227e3678b (patch) | |
| tree | 6c94d9c868eb022588a07245df978478034ac5dd /src/message | |
| parent | 8f360dd9cc45bb14431238ccc5e3d137c020fa7b (diff) | |
| parent | 461814e5174cef1be3e07b4e4069314e9bcbedd6 (diff) | |
Merge branch 'main' into wip/mobile
Diffstat (limited to 'src/message')
| -rw-r--r-- | src/message/app.rs | 47 | ||||
| -rw-r--r-- | src/message/body.rs | 30 | ||||
| -rw-r--r-- | src/message/history.rs | 5 | ||||
| -rw-r--r-- | src/message/mod.rs | 5 | ||||
| -rw-r--r-- | src/message/repo.rs | 234 | ||||
| -rw-r--r-- | src/message/routes.rs | 46 | ||||
| -rw-r--r-- | src/message/routes/message/mod.rs | 46 | ||||
| -rw-r--r-- | src/message/routes/message/test.rs | 160 | ||||
| -rw-r--r-- | src/message/routes/mod.rs | 9 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 8 |
10 files changed, 447 insertions, 143 deletions
diff --git a/src/message/app.rs b/src/message/app.rs index 3385af2..eed6ba4 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,13 +2,14 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Id, Message}; +use super::{repo::Provider as _, Body, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, event::{repo::Provider as _, Broadcaster, Event, Sequence}, login::Login, + name, }; pub struct Messages<'a> { @@ -26,7 +27,7 @@ impl<'a> Messages<'a> { channel: &channel::Id, sender: &Login, sent_at: &DateTime, - body: &str, + body: &Body, ) -> Result<Message, SendError> { let mut tx = self.db.begin().await?; let channel = tx @@ -46,8 +47,17 @@ impl<'a> Messages<'a> { pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; + let message = tx + .messages() + .by_id(message) + .await + .not_found(|| DeleteError::NotFound(message.clone()))?; + message + .as_snapshot() + .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?; + let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; tx.commit().await?; self.events.broadcast( @@ -91,6 +101,17 @@ impl<'a> Messages<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 6 hours. + let purge_at = relative_to.to_owned() - TimeDelta::hours(6); + + let mut tx = self.db.begin().await?; + tx.messages().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] @@ -98,15 +119,27 @@ pub enum SendError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From<channel::repo::LoadError> for SendError { + fn from(error: channel::repo::LoadError) -> Self { + use channel::repo::LoadError; + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } } #[derive(Debug, thiserror::Error)] pub enum DeleteError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), #[error("message {0} not found")] NotFound(Id), + #[error("message {0} deleted")] + Deleted(Id), #[error(transparent)] - DatabaseError(#[from] sqlx::Error), + Database(#[from] sqlx::Error), } diff --git a/src/message/body.rs b/src/message/body.rs new file mode 100644 index 0000000..6dd224c --- /dev/null +++ b/src/message/body.rs @@ -0,0 +1,30 @@ +use std::fmt; + +use crate::normalize::nfc; + +#[derive( + Clone, Debug, Default, Eq, PartialEq, serde::Deserialize, serde::Serialize, sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Body(nfc::String); + +impl fmt::Display for Body { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self(body) = self; + body.fmt(f) + } +} + +impl From<String> for Body { + fn from(body: String) -> Self { + Self(body.into()) + } +} + +impl From<Body> for String { + fn from(body: Body) -> Self { + let Body(body) = body; + body.into() + } +} diff --git a/src/message/history.rs b/src/message/history.rs index 09e69b7..0424d0d 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -30,6 +30,11 @@ impl History { .filter(Sequence::up_to(resume_point.into())) .collect() } + + // Snapshot of this message as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Message> { + self.events().collect() + } } // Events interface diff --git a/src/message/mod.rs b/src/message/mod.rs index a8f51ab..c2687bc 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,4 +1,5 @@ pub mod app; +mod body; pub mod event; mod history; mod id; @@ -6,4 +7,6 @@ pub mod repo; mod routes; mod snapshot; -pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; +pub use self::{ + body::Body, event::Event, history::History, id::Id, routes::router, snapshot::Message, +}; diff --git a/src/message/repo.rs b/src/message/repo.rs index 71c6d10..c8ceceb 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,6 +1,6 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use super::{snapshot::Message, History, Id}; +use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, @@ -26,24 +26,24 @@ impl<'c> Messages<'c> { channel: &channel::History, sender: &Login, sent: &Instant, - body: &str, + body: &Body, ) -> Result<History, sqlx::Error> { let id = Id::generate(); let channel_id = channel.id(); let message = sqlx::query!( r#" - insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: Id", + insert into message + (id, channel, sender, sent_at, sent_sequence, body) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: Id", channel as "channel: channel::Id", sender as "sender: login::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", - body - "#, + body as "body: Body" + "#, id, channel_id, sender.id, @@ -53,14 +53,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: None, }, deleted: None, }) @@ -70,41 +68,37 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel( - &mut self, - channel: &channel::History, - resume_at: ResumePoint, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body as "body: Body", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message - where channel = $1 - and coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where message.channel = $1 + and deleted.id is null "#, channel_id, - resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -116,30 +110,32 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body as "body: Body", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message - where coalesce(sent_sequence <= $2, true) - order by sent_sequence + left join message_deleted as deleted + using (id) + where coalesce(message.sent_sequence <= $2, true) + order by message.sent_sequence "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -147,33 +143,35 @@ impl<'c> Messages<'c> { Ok(messages) } - async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { + pub async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.body as "body: Body", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message + left join message_deleted as deleted + using (id) where id = $1 "#, message, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -183,39 +181,101 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - message: &Id, + message: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let history = self.by_id(message).await?; + let id = message.id(); - sqlx::query_scalar!( + sqlx::query!( r#" - delete from message - where - id = $1 - returning 1 as "deleted: i64" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) "#, - history.message.id, + id, + deleted.at, + deleted.sequence, ) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(History { - deleted: Some(*deleted), - ..history - }) + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the channel, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '' + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + let message = self.by_id(id).await?; + + Ok(message) } - pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" + delete from message_deleted + where deleted_at < $1 + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for message in messages { + sqlx::query!( + r#" + delete from message + where id = $1 + "#, + message, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" select - id as "message: Id" + id as "id: Id", + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body as "body: Body", + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message - where sent_at < $1 + left join message_deleted as deleted + using (id) + where message.sent_at < $1 + and deleted.id is null "#, expire_at, ) + .map(|row| History { + message: Message { + sent: Instant::new(row.sent_at, row.sent_sequence), + id: row.id, + channel: row.channel, + sender: row.sender, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, + }, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; @@ -226,29 +286,31 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel as "channel: channel::Id", - sender as "sender: login::Id", id as "id: Id", - body, - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence" + message.channel as "channel: channel::Id", + message.sender as "sender: login::Id", + message.sent_at as "sent_at: DateTime", + message.sent_sequence as "sent_sequence: Sequence", + message.body as "body: Body", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from message + left join message_deleted as deleted + using (id) where coalesce(message.sent_sequence > $1, true) "#, resume_at, ) .map(|row| History { message: Message { - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, + sent: Instant::new(row.sent_at, row.sent_sequence), channel: row.channel, sender: row.sender, id: row.id, - body: row.body, + body: row.body.unwrap_or_default(), + deleted_at: row.deleted_at, }, - deleted: None, + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes.rs b/src/message/routes.rs deleted file mode 100644 index e21c674..0000000 --- a/src/message/routes.rs +++ /dev/null @@ -1,46 +0,0 @@ -use axum::{ - extract::{Path, State}, - http::StatusCode, - response::{IntoResponse, Response}, - routing::delete, - Router, -}; - -use crate::{ - app::App, - clock::RequestedAt, - error::{Internal, NotFound}, - login::Login, - message::{self, app::DeleteError}, -}; - -pub fn router() -> Router<App> { - Router::new().route("/api/messages/:message", delete(on_delete)) -} - -async fn on_delete( - State(app): State<App>, - Path(message): Path<message::Id>, - RequestedAt(deleted_at): RequestedAt, - _: Login, -) -> Result<StatusCode, ErrorResponse> { - app.messages().delete(&message, &deleted_at).await?; - - Ok(StatusCode::ACCEPTED) -} - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -struct ErrorResponse(#[from] DeleteError); - -impl IntoResponse for ErrorResponse { - fn into_response(self) -> Response { - let Self(error) = self; - match error { - not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { - NotFound(not_found).into_response() - } - other => Internal::from(other).into_response(), - } - } -} diff --git a/src/message/routes/message/mod.rs b/src/message/routes/message/mod.rs new file mode 100644 index 0000000..545ad26 --- /dev/null +++ b/src/message/routes/message/mod.rs @@ -0,0 +1,46 @@ +#[cfg(test)] +mod test; + +pub mod delete { + use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + }; + + use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + message::{self, app::DeleteError}, + token::extract::Identity, + }; + + pub async fn handler( + State(app): State<App>, + Path(message): Path<message::Id>, + RequestedAt(deleted_at): RequestedAt, + _: Identity, + ) -> Result<StatusCode, Error> { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) + } + + #[derive(Debug, thiserror::Error)] + #[error(transparent)] + pub struct Error(#[from] pub DeleteError); + + impl IntoResponse for Error { + fn into_response(self) -> Response { + let Self(error) = self; + #[allow(clippy::match_wildcard_for_single_variants)] + match error { + DeleteError::NotFound(_) | DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + other => Internal::from(other).into_response(), + } + } + } +} diff --git a/src/message/routes/message/test.rs b/src/message/routes/message/test.rs new file mode 100644 index 0000000..2016fb8 --- /dev/null +++ b/src/message/routes/message/test.rs @@ -0,0 +1,160 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, +}; + +use super::delete; +use crate::{message::app, test::fixtures}; + +#[tokio::test] +pub async fn delete_message() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let response = delete::handler( + State(app.clone()), + Path(message.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect("deleting a valid message succeeds"); + + // Verify the response + + assert_eq!(response, StatusCode::ACCEPTED); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(!snapshot.messages.contains(&message)); +} + +#[tokio::test] +pub async fn delete_invalid_message_id() { + // Set up the environment + + let app = fixtures::scratch_app().await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let message = fixtures::message::fictitious(); + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(message.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a nonexistent message fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == message)); +} + +#[tokio::test] +pub async fn delete_deleted() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + app.messages() + .delete(&message.id, &fixtures::now()) + .await + .expect("deleting a recently-sent message succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(message.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a deleted message fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == message.id)); +} + +#[tokio::test] +pub async fn delete_expired() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + app.messages() + .expire(&fixtures::now()) + .await + .expect("expiring messages always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(message.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting an expired message fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::Deleted(id) if id == message.id)); +} + +#[tokio::test] +pub async fn delete_purged() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app, &fixtures::ancient()).await; + let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + + app.messages() + .expire(&fixtures::old()) + .await + .expect("expiring messages always succeeds"); + + app.messages() + .purge(&fixtures::now()) + .await + .expect("purging messages always succeeds"); + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(message.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a purged message fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotFound(id) if id == message.id)); +} diff --git a/src/message/routes/mod.rs b/src/message/routes/mod.rs new file mode 100644 index 0000000..dfe8628 --- /dev/null +++ b/src/message/routes/mod.rs @@ -0,0 +1,9 @@ +use axum::{routing::delete, Router}; + +use crate::app::App; + +mod message; + +pub fn router() -> Router<App> { + Router::new().route("/api/messages/:message", delete(message::delete::handler)) +} diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 0eb37bb..53b7176 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -1,8 +1,8 @@ use super::{ event::{Event, Sent}, - Id, + Body, Id, }; -use crate::{channel, event::Instant, login}; +use crate::{channel, clock::DateTime, event::Instant, login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { @@ -11,7 +11,9 @@ pub struct Message { pub channel: channel::Id, pub sender: login::Id, pub id: Id, - pub body: String, + pub body: Body, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, } impl Message { |
