diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-10-03 23:30:42 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-10-03 23:30:42 -0400 |
| commit | d50b1b56c011c03c7d8a95242af404b727e91a80 (patch) | |
| tree | efe3408f6a8ef669981826d1a29d16a24b460d89 /src/message | |
| parent | 30c13478d61065a512f5bc8824fecbf2ee6afc81 (diff) | |
| parent | 7f12fd41c2941a55a6437f24e4f780104a718790 (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/message')
| -rw-r--r-- | src/message/app.rs | 115 | ||||
| -rw-r--r-- | src/message/event.rs | 71 | ||||
| -rw-r--r-- | src/message/history.rs | 41 | ||||
| -rw-r--r-- | src/message/id.rs | 27 | ||||
| -rw-r--r-- | src/message/mod.rs | 9 | ||||
| -rw-r--r-- | src/message/repo.rs | 247 | ||||
| -rw-r--r-- | src/message/routes.rs | 46 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 76 |
8 files changed, 632 insertions, 0 deletions
diff --git a/src/message/app.rs b/src/message/app.rs new file mode 100644 index 0000000..33ea8ad --- /dev/null +++ b/src/message/app.rs @@ -0,0 +1,115 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{repo::Provider as _, Id, Message}; +use crate::{ + channel::{self, repo::Provider as _}, + clock::DateTime, + db::NotFound as _, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + login::Login, +}; + +pub struct Messages<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Messages<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn send( + &self, + channel: &channel::Id, + sender: &Login, + sent_at: &DateTime, + body: &str, + ) -> Result<Message, SendError> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; + let sent = tx.sequence().next(sent_at).await?; + let message = tx + .messages() + .create(&channel.snapshot(), sender, &sent, body) + .await?; + tx.commit().await?; + + self.events + .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); + + Ok(message.snapshot()) + } + + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + + let expired = tx.messages().expired(&expire_at).await?; + let mut events = Vec::with_capacity(expired.len()); + for message in expired { + let deleted = tx.sequence().next(relative_to).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.push( + message + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} diff --git a/src/message/event.rs b/src/message/event.rs new file mode 100644 index 0000000..66db9b0 --- /dev/null +++ b/src/message/event.rs @@ -0,0 +1,71 @@ +use super::{snapshot::Message, Id}; +use crate::{ + channel::Channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.kind.instant() + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Sent(Sent), + Deleted(Deleted), +} + +impl Sequenced for Kind { + fn instant(&self) -> Instant { + match self { + Self::Sent(sent) => sent.instant(), + Self::Deleted(deleted) => deleted.instant(), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Sent { + #[serde(flatten)] + pub message: Message, +} + +impl Sequenced for Sent { + fn instant(&self) -> Instant { + self.message.sent + } +} + +impl From<Sent> for Kind { + fn from(event: Sent) -> Self { + Self::Sent(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, + pub channel: Channel, + pub message: Id, +} + +impl Sequenced for Deleted { + fn instant(&self) -> Instant { + self.instant + } +} + +impl From<Deleted> for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/message/history.rs b/src/message/history.rs new file mode 100644 index 0000000..89fc6b1 --- /dev/null +++ b/src/message/history.rs @@ -0,0 +1,41 @@ +use super::{ + event::{Deleted, Event, Sent}, + Message, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub message: Message, + pub deleted: Option<Instant>, +} + +impl History { + fn sent(&self) -> Event { + Event { + kind: Sent { + message: self.message.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| Event { + kind: Deleted { + instant, + channel: self.message.channel.clone(), + message: self.message.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator<Item = Event> { + [self.sent()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Message { + self.message.clone() + } +} diff --git a/src/message/id.rs b/src/message/id.rs new file mode 100644 index 0000000..385b103 --- /dev/null +++ b/src/message/id.rs @@ -0,0 +1,27 @@ +use std::fmt; + +use crate::id::Id as BaseId; + +// Stable identifier for a [Message]. Prefixed with `M`. +#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From<BaseId> for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("M") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/message/mod.rs b/src/message/mod.rs new file mode 100644 index 0000000..a8f51ab --- /dev/null +++ b/src/message/mod.rs @@ -0,0 +1,9 @@ +pub mod app; +pub mod event; +mod history; +mod id; +pub mod repo; +mod routes; +mod snapshot; + +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs new file mode 100644 index 0000000..fc835c8 --- /dev/null +++ b/src/message/repo.rs @@ -0,0 +1,247 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use super::{snapshot::Message, History, Id}; +use crate::{ + channel::{self, Channel}, + clock::DateTime, + event::{Instant, Sequence}, + login::{self, Login}, +}; + +pub trait Provider { + fn messages(&mut self) -> Messages; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn messages(&mut self) -> Messages { + Messages(self) + } +} + +pub struct Messages<'t>(&'t mut SqliteConnection); + +impl<'c> Messages<'c> { + pub async fn create( + &mut self, + channel: &Channel, + sender: &Login, + sent: &Instant, + body: &str, + ) -> Result<History, sqlx::Error> { + let id = Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sender, sent_at, sent_sequence, body) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: Id", + body + "#, + id, + channel.id, + sender.id, + sent.at, + sent.sequence, + body, + ) + .map(|row| History { + message: Message { + sent: *sent, + channel: channel.clone(), + sender: sender.clone(), + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn in_channel( + &mut self, + channel: &Channel, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where channel.id = $1 + and coalesce(message.sent_sequence <= $2, true) + order by message.sent_sequence + "#, + channel.id, + resume_at, + ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { + let message = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where message.id = $1 + "#, + message, + ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn delete( + &mut self, + message: &Id, + deleted: &Instant, + ) -> Result<History, sqlx::Error> { + let history = self.by_id(message).await?; + + sqlx::query_scalar!( + r#" + delete from message + where + id = $1 + returning 1 as "deleted: i64" + "#, + history.message.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(History { + deleted: Some(*deleted), + ..history + }) + } + + pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + let messages = sqlx::query_scalar!( + r#" + select + id as "message: Id" + from message + where sent_at < $1 + "#, + expire_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn replay( + &mut self, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where coalesce(message.sent_sequence > $1, true) + "#, + resume_at, + ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} diff --git a/src/message/routes.rs b/src/message/routes.rs new file mode 100644 index 0000000..29fe3d7 --- /dev/null +++ b/src/message/routes.rs @@ -0,0 +1,46 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::delete, + Router, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::Login, + message::{self, app::DeleteError}, +}; + +pub fn router() -> Router<App> { + Router::new().route("/api/messages/:message", delete(on_delete)) +} + +async fn on_delete( + State(app): State<App>, + Path(message): Path<message::Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result<StatusCode, ErrorResponse> { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] DeleteError); + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs new file mode 100644 index 0000000..522c1aa --- /dev/null +++ b/src/message/snapshot.rs @@ -0,0 +1,76 @@ +use super::{ + event::{Event, Kind, Sent}, + Id, +}; +use crate::{channel::Channel, event::Instant, login::Login}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(into = "self::serialize::Message")] +pub struct Message { + #[serde(skip)] + pub sent: Instant, + pub channel: Channel, + pub sender: Login, + pub id: Id, + pub body: String, +} + +mod serialize { + use crate::{channel::Channel, login::Login, message::Id}; + + #[derive(serde::Serialize)] + pub struct Message { + channel: Channel, + sender: Login, + #[allow(clippy::struct_field_names)] + // Deliberately redundant with the module path; this produces a specific serialization. + message: MessageData, + } + + #[derive(serde::Serialize)] + pub struct MessageData { + id: Id, + body: String, + } + + impl From<super::Message> for Message { + fn from(message: super::Message) -> Self { + Self { + channel: message.channel, + sender: message.sender, + message: MessageData { + id: message.id, + body: message.body, + }, + } + } + } +} + +impl Message { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event.kind) { + (None, Kind::Sent(event)) => Some(event.into()), + (Some(message), Kind::Deleted(event)) if message.id == event.message => None, + (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Message> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Message::apply) + } +} + +impl From<&Sent> for Message { + fn from(event: &Sent) -> Self { + event.message.clone() + } +} + +impl From<Sent> for Message { + fn from(event: Sent) -> Self { + event.message + } +} |
