diff options
Diffstat (limited to 'src')
33 files changed, 956 insertions, 533 deletions
@@ -4,6 +4,7 @@ use crate::{ channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, login::app::Logins, + message::app::Messages, token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; @@ -35,6 +36,10 @@ impl App { Logins::new(&self.db) } + pub const fn messages(&self) -> Messages { + Messages::new(&self.db, &self.events) + } + pub const fn tokens(&self) -> Tokens { Tokens::new(&self.db, &self.tokens) } diff --git a/src/broadcast.rs b/src/broadcast.rs index 083a301..bedc263 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -32,7 +32,7 @@ where { // panic: if ``message.channel.id`` has not been previously registered, // and was not part of the initial set of channels. - pub fn broadcast(&self, message: &M) { + pub fn broadcast(&self, message: impl Into<M>) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -42,7 +42,7 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.clone()); + let _ = tx.send(message.into()); } // panic: if ``channel`` has not been previously registered, and was not diff --git a/src/channel/app.rs b/src/channel/app.rs index b7e3a10..6ce826b 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,10 +1,11 @@ use chrono::TimeDelta; +use itertools::Itertools; use sqlx::sqlite::SqlitePool; use crate::{ channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, }; pub struct Channels<'a> { @@ -27,10 +28,11 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - self.events - .broadcast(&ChannelEvent::created(channel.clone())); + for event in channel.events() { + self.events.broadcast(event); + } - Ok(channel) + Ok(channel.snapshot()) } pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { @@ -38,6 +40,16 @@ impl<'a> Channels<'a> { let channels = tx.channels().all(resume_point).await?; tx.commit().await?; + let channels = channels + .into_iter() + .filter_map(|channel| { + channel + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + Ok(channels) } @@ -51,14 +63,21 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { let deleted = tx.sequence().next(relative_to).await?; - let event = tx.channels().delete(&channel, &deleted).await?; - events.push(event); + let channel = tx.channels().delete(&channel, &deleted).await?; + events.push( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); } tx.commit().await?; - for event in events { - self.events.broadcast(&event); + for event in events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + { + self.events.broadcast(event); } Ok(()) diff --git a/src/channel/event.rs b/src/channel/event.rs new file mode 100644 index 0000000..9c54174 --- /dev/null +++ b/src/channel/event.rs @@ -0,0 +1,48 @@ +use super::Channel; +use crate::{ + channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Created(Created), + Deleted(Deleted), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + pub channel: Channel, +} + +impl From<Created> for Kind { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + pub channel: channel::Id, +} + +impl From<Deleted> for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/channel/history.rs b/src/channel/history.rs new file mode 100644 index 0000000..3cc7d9d --- /dev/null +++ b/src/channel/history.rs @@ -0,0 +1,42 @@ +use super::{ + event::{Created, Deleted, Event}, + Channel, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub channel: Channel, + pub created: Instant, + pub deleted: Option<Instant>, +} + +impl History { + fn created(&self) -> Event { + Event { + instant: self.created, + kind: Created { + channel: self.channel.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| Event { + instant, + kind: Deleted { + channel: self.channel.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator<Item = Event> { + [self.created()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Channel { + self.channel.clone() + } +} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 4baa7e3..eb8200b 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,16 +1,9 @@ -use crate::event::Instant; - pub mod app; +pub mod event; +mod history; mod id; pub mod repo; mod routes; +mod snapshot; -pub use self::{id::Id, routes::router}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Channel { - pub id: Id, - pub name: String, - #[serde(skip)] - pub created: Instant, -} +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs index c000b56..8bb761b 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -1,9 +1,9 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ - channel::{Channel, Id}, + channel::{Channel, History, Id}, clock::DateTime, - event::{types, Instant, Sequence}, + event::{Instant, Sequence}, }; pub trait Provider { @@ -19,7 +19,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> { let id = Id::generate(); let channel = sqlx::query!( r#" @@ -37,13 +37,16 @@ impl<'c> Channels<'c> { created.at, created.sequence, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_one(&mut *self.0) .await?; @@ -51,7 +54,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> { + pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { let channel = sqlx::query!( r#" select @@ -64,13 +67,16 @@ impl<'c> Channels<'c> { "#, channel, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_one(&mut *self.0) .await?; @@ -81,7 +87,7 @@ impl<'c> Channels<'c> { pub async fn all( &mut self, resume_point: Option<Sequence>, - ) -> Result<Vec<Channel>, sqlx::Error> { + ) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -95,13 +101,16 @@ impl<'c> Channels<'c> { "#, resume_point, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_all(&mut *self.0) .await?; @@ -112,7 +121,7 @@ impl<'c> Channels<'c> { pub async fn replay( &mut self, resume_at: Option<Sequence>, - ) -> Result<Vec<Channel>, sqlx::Error> { + ) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -125,13 +134,16 @@ impl<'c> Channels<'c> { "#, resume_at, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_all(&mut *self.0) .await?; @@ -141,35 +153,43 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Channel, + channel: &Id, deleted: &Instant, - ) -> Result<types::ChannelEvent, sqlx::Error> { - let channel = channel.id.clone(); - sqlx::query_scalar!( + ) -> Result<History, sqlx::Error> { + let channel = sqlx::query!( r#" delete from channel where id = $1 - returning 1 as "row: i64" + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" "#, channel, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: Some(*deleted), + }) .fetch_one(&mut *self.0) .await?; - Ok(types::ChannelEvent { - instant: *deleted, - data: types::DeletedEvent { channel }.into(), - }) + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query!( + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + let channels = sqlx::query_scalar!( r#" select - channel.id as "id: Id", - channel.name, - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence" + channel.id as "id: Id" from channel left join message where created_at < $1 @@ -177,14 +197,6 @@ impl<'c> Channels<'c> { "#, expired_at, ) - .map(|row| Channel { - id: row.id, - name: row.name, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5d8b61e..5bb1ee9 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -13,8 +13,9 @@ use crate::{ channel::{self, Channel}, clock::RequestedAt, error::Internal, - event::{app::EventsError, Sequence}, + event::Sequence, login::Login, + message::app::Error as MessageError, }; #[cfg(test)] @@ -99,8 +100,8 @@ async fn on_send( login: Login, Json(request): Json<SendRequest>, ) -> Result<StatusCode, ErrorResponse> { - app.events() - .send(&login, &channel, &request.message, &sent_at) + app.messages() + .send(&channel, &login, &sent_at, &request.message) .await // Could impl `From` here, but it's more code and this is used once. .map_err(ErrorResponse)?; @@ -109,13 +110,13 @@ async fn on_send( } #[derive(Debug)] -struct ErrorResponse(EventsError); +struct ErrorResponse(MessageError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ EventsError::ChannelNotFound(_) => { + not_found @ MessageError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 9988932..5733c9e 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -3,7 +3,7 @@ use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, - event::types, + event, test::fixtures::{self, future::Immediately as _}, }; @@ -50,8 +50,8 @@ async fn new_channel() { .expect("creation event published"); assert!(matches!( - event.data, - types::ChannelEventData::Created(event) + event.kind, + event::Kind::ChannelCreated(event) if event.channel == response_channel )); } diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 33ec3b7..1027b29 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -4,7 +4,8 @@ use futures::stream::StreamExt; use crate::{ channel, channel::routes, - event::{app, types}, + event, + message::app, test::fixtures::{self, future::Immediately as _}, }; @@ -54,10 +55,10 @@ async fn messages_in_order() { for ((sent_at, message), event) in requests.into_iter().zip(events) { assert_eq!(*sent_at, event.instant.at); assert!(matches!( - event.data, - types::ChannelEventData::Message(event_message) - if event_message.sender == sender - && event_message.message.body == message + event.kind, + event::Kind::MessageSent(event) + if event.message.sender == sender + && event.message.body == message )); } } @@ -90,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::EventsError::ChannelNotFound(error_channel) if channel == error_channel + app::Error::ChannelNotFound(error_channel) if channel == error_channel )); } diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs new file mode 100644 index 0000000..6462f25 --- /dev/null +++ b/src/channel/snapshot.rs @@ -0,0 +1,38 @@ +use super::{ + event::{Created, Event, Kind}, + Id, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Channel { + pub id: Id, + pub name: String, +} + +impl Channel { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event.kind) { + (None, Kind::Created(event)) => Some(event.into()), + (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None, + (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Channel> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Channel::apply) + } +} + +impl From<&Created> for Channel { + fn from(event: &Created) -> Self { + event.channel.clone() + } +} + +impl From<Created> for Channel { + fn from(event: Created) -> Self { + event.channel + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 5e9e79a..e58bea9 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -1,22 +1,15 @@ -use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{ - broadcaster::Broadcaster, - repo::message::Provider as _, - types::{self, ChannelEvent}, -}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, - clock::DateTime, - db::NotFound as _, - event::{repo::Provider as _, Sequence}, - login::Login, + message::{self, repo::Provider as _}, }; pub struct Events<'a> { @@ -29,111 +22,52 @@ impl<'a> Events<'a> { Self { db, events } } - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, EventsError> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent = tx.sequence().next(sent_at).await?; - let event = tx - .message_events() - .create(login, &channel, &sent, body) - .await?; - tx.commit().await?; - - self.events.broadcast(&event); - Ok(event) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let expired = tx.message_events().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { - let deleted = tx.sequence().next(relative_to).await?; - let event = tx - .message_events() - .delete(&channel, &message, &deleted) - .await?; - events.push(event); - } - - tx.commit().await?; - - for event in events { - self.events.broadcast(&event); - } - - Ok(()) - } - pub async fn subscribe( &self, resume_at: Option<Sequence>, - ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> { + ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); let mut tx = self.db.begin().await?; - let channels = tx.channels().replay(resume_at).await?; + let channels = tx.channels().replay(resume_at).await?; let channel_events = channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| { - resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at) - }); - - let message_events = tx.message_events().replay(resume_at).await?; - - let mut replay_events = channel_events - .into_iter() - .chain(message_events.into_iter()) + .iter() + .map(channel::History::events) + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let messages = tx.messages().replay(resume_at).await?; + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let replay_events = channel_events + .merge_by(message_events, |a, b| { + a.instant.sequence < b.instant.sequence + }) .collect::<Vec<_>>(); - replay_events.sort_by_key(|event| Sequence::from(event)); - let resume_live_at = replay_events.last().map(Sequence::from); + let resume_live_at = replay_events.last().map(Sequenced::sequence); let replay = stream::iter(replay_events); - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from - // stored_messages. + // `replay_events`. .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) } - fn resume( - resume_at: Option<Sequence>, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < Some(Sequence::from(event))) + fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + let filter = Sequence::after(resume_at); + move |event| future::ready(filter(event)) } } - -#[derive(Debug, thiserror::Error)] -pub enum EventsError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index 92f631f..de2513a 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ -use crate::{broadcast, event::types}; +use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster<types::ChannelEvent>; +pub type Broadcaster = broadcast::Broadcaster<super::Event>; diff --git a/src/event/mod.rs b/src/event/mod.rs index c982d3a..1503b77 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,18 +1,75 @@ +use crate::{channel, message}; + pub mod app; pub mod broadcaster; mod extract; pub mod repo; mod routes; mod sequence; -pub mod types; -use crate::clock::DateTime; +pub use self::{ + routes::router, + sequence::{Instant, Sequence, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +impl From<channel::Event> for Event { + fn from(event: channel::Event) -> Self { + Self { + instant: event.instant, + kind: event.kind.into(), + } + } +} + +impl From<message::Event> for Event { + fn from(event: message::Event) -> Self { + Self { + instant: event.instant, + kind: event.kind.into(), + } + } +} -pub use self::{routes::router, sequence::Sequence}; +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + #[serde(rename = "created")] + ChannelCreated(channel::event::Created), + #[serde(rename = "message")] + MessageSent(message::event::Sent), + MessageDeleted(message::event::Deleted), + #[serde(rename = "deleted")] + ChannelDeleted(channel::event::Deleted), +} + +impl From<channel::event::Kind> for Kind { + fn from(kind: channel::event::Kind) -> Self { + match kind { + channel::event::Kind::Created(created) => Self::ChannelCreated(created), + channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted), + } + } +} -#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Instant { - pub at: DateTime, - #[serde(skip)] - pub sequence: Sequence, +impl From<message::event::Kind> for Kind { + fn from(kind: message::event::Kind) -> Self { + match kind { + message::event::Kind::Sent(created) => Self::MessageSent(created), + message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted), + } + } } diff --git a/src/event/repo/sequence.rs b/src/event/repo.rs index 40d6a53..40d6a53 100644 --- a/src/event/repo/sequence.rs +++ b/src/event/repo.rs diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs deleted file mode 100644 index f29c8a4..0000000 --- a/src/event/repo/message.rs +++ /dev/null @@ -1,196 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - channel::{self, Channel}, - clock::DateTime, - event::{types, Instant, Sequence}, - login::{self, Login}, - message::{self, Message}, -}; - -pub trait Provider { - fn message_events(&mut self) -> Events; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn message_events(&mut self) -> Events { - Events(self) - } -} - -pub struct Events<'t>(&'t mut SqliteConnection); - -impl<'c> Events<'c> { - pub async fn create( - &mut self, - sender: &Login, - channel: &Channel, - sent: &Instant, - body: &str, - ) -> Result<types::ChannelEvent, sqlx::Error> { - let id = message::Id::generate(); - - let message = sqlx::query!( - r#" - insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: message::Id", - sender as "sender: login::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body - "#, - id, - channel.id, - sender.id, - sent.at, - sent.sequence, - body, - ) - .map(|row| types::ChannelEvent { - instant: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, - data: types::MessageEvent { - channel: channel.clone(), - sender: sender.clone(), - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn delete( - &mut self, - channel: &Channel, - message: &message::Id, - deleted: &Instant, - ) -> Result<types::ChannelEvent, sqlx::Error> { - sqlx::query_scalar!( - r#" - delete from message - where id = $1 - returning 1 as "row: i64" - "#, - message, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - instant: Instant { - at: deleted.at, - sequence: deleted.sequence, - }, - data: types::MessageDeletedEvent { - channel: channel.clone(), - message: message.clone(), - } - .into(), - }) - } - - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - channel.created_sequence as "channel_created_sequence: Sequence", - message.id as "message: message::Id" - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where sent_at < $1 - "#, - expire_at, - ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - created: Instant { - at: row.channel_created_at, - sequence: row.channel_created_sequence, - }, - }, - row.message, - ) - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } - - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { - let events = sqlx::query!( - r#" - select - message.id as "id: message::Id", - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - channel.created_sequence as "channel_created_sequence: Sequence", - sender.id as "sender_id: login::Id", - sender.name as sender_name, - message.sent_at as "sent_at: DateTime", - message.sent_sequence as "sent_sequence: Sequence", - message.body - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where coalesce(message.sent_sequence > $1, true) - order by sent_sequence asc - "#, - resume_at, - ) - .map(|row| types::ChannelEvent { - instant: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, - data: types::MessageEvent { - channel: Channel { - id: row.channel_id, - name: row.channel_name, - created: Instant { - at: row.channel_created_at, - sequence: row.channel_created_sequence, - }, - }, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(events) - } -} diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs deleted file mode 100644 index cee840c..0000000 --- a/src/event/repo/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod message; -mod sequence; - -pub use self::sequence::Provider; diff --git a/src/event/routes.rs b/src/event/routes.rs index c87bfb2..5b9c7e3 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -10,11 +10,11 @@ use axum::{ use axum_extra::extract::Query; use futures::stream::{Stream, StreamExt as _}; -use super::{extract::LastEventId, types}; +use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::Sequence, + event::{Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -35,7 +35,7 @@ async fn events( identity: Identity, last_event_id: Option<LastEventId<Sequence>>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> { +) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> { let resume_at = last_event_id .map(LastEventId::into_inner) .or(query.resume_point); @@ -51,7 +51,7 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = types::ChannelEvent> + Send + 'static, + S: Stream<Item = Event> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -62,11 +62,11 @@ where } } -impl TryFrom<types::ChannelEvent> for sse::Event { +impl TryFrom<Event> for sse::Event { type Error = serde_json::Error; - fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> { - let id = serde_json::to_string(&Sequence::from(&event))?; + fn try_from(event: Event) -> Result<Self, Self::Error> { + let id = serde_json::to_string(&event.sequence())?; let data = serde_json::to_string_pretty(&event)?; let event = Self::default().id(id).data(data); diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index 68b55cc..ba9953e 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -6,7 +6,7 @@ use futures::{ }; use crate::{ - event::{routes, Sequence}, + event::{routes, Sequenced as _}, test::fixtures::{self, future::Immediately as _}, }; @@ -17,7 +17,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Call the endpoint @@ -36,7 +36,7 @@ async fn includes_historical_message() { .await .expect("delivered stored message"); - assert_eq!(message, event); + assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] @@ -58,7 +58,7 @@ async fn includes_live_message() { // Verify the semantics let sender = fixtures::login::create(&app).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events .filter(fixtures::filter::messages()) @@ -67,7 +67,7 @@ async fn includes_live_message() { .await .expect("delivered live message"); - assert_eq!(message, event); + assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] @@ -87,7 +87,7 @@ async fn includes_multiple_channels() { let app = app.clone(); let sender = sender.clone(); let channel = channel.clone(); - async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } }) .collect::<Vec<_>>() .await; @@ -110,7 +110,9 @@ async fn includes_multiple_channels() { .await; for message in &messages { - assert!(events.iter().any(|event| { event == message })); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } } @@ -123,9 +125,9 @@ async fn sequential_messages() { let sender = fixtures::login::create(&app).await; let messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -138,7 +140,13 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| future::ready(messages.contains(event))); + let mut events = events.filter(|event| { + future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)), + ) + }); // Verify delivery in order for message in &messages { @@ -148,7 +156,7 @@ async fn sequential_messages() { .await .expect("undelivered messages remaining"); - assert_eq!(message, &event); + assert!(fixtures::event::message_sent(&event, message)); } } @@ -160,11 +168,11 @@ async fn resumes_from() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; - let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let later_messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -190,9 +198,9 @@ async fn resumes_from() { .await .expect("delivered events"); - assert_eq!(initial_message, event); + assert!(fixtures::event::message_sent(&event, &initial_message)); - Sequence::from(&event) + event.sequence() }; // Resume after disconnect @@ -214,7 +222,9 @@ async fn resumes_from() { .await; for message in &later_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } } @@ -249,8 +259,8 @@ async fn serial_resume() { let resume_at = { let initial_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // First subscription @@ -271,12 +281,14 @@ async fn serial_resume() { .await; for message in &initial_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); - Sequence::from(event) + event.sequence() }; // Resume after disconnect @@ -285,8 +297,8 @@ async fn serial_resume() { // Note that channel_b does not appear here. The buggy behaviour // would be masked if channel_b happened to send a new message // into the resumed event stream. - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, ]; // Second subscription @@ -307,12 +319,14 @@ async fn serial_resume() { .await; for message in &resume_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); - Sequence::from(event) + event.sequence() }; // Resume after disconnect a second time @@ -321,8 +335,8 @@ async fn serial_resume() { // problem. The resume point should before both of these messages, but // after _all_ prior messages. let final_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // Third subscription @@ -345,7 +359,9 @@ async fn serial_resume() { // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. for message in &final_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } }; } @@ -378,13 +394,17 @@ async fn terminates_on_token_expiry() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events - .filter(|event| future::ready(messages.contains(event))) + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) .next() .immediately() .await @@ -425,13 +445,17 @@ async fn terminates_on_logout() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events - .filter(|event| future::ready(messages.contains(event))) + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) .next() .immediately() .await diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9ebddd7..c566156 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,20 @@ use std::fmt; +use crate::clock::DateTime; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Instant { + pub at: DateTime, + #[serde(skip)] + pub sequence: Sequence, +} + +impl From<Instant> for Sequence { + fn from(instant: Instant) -> Self { + instant.sequence + } +} + #[derive( Clone, Copy, @@ -22,3 +37,47 @@ impl fmt::Display for Sequence { value.fmt(f) } } + +impl Sequence { + pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + } + + pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point < Some(event.sequence()) + } + + pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point <= event.sequence() + } +} + +pub trait Sequenced { + fn instant(&self) -> Instant; + + fn sequence(&self) -> Sequence { + self.instant().into() + } +} + +impl<E> Sequenced for &E +where + E: Sequenced, +{ + fn instant(&self) -> Instant { + (*self).instant() + } + + fn sequence(&self) -> Sequence { + (*self).sequence() + } +} diff --git a/src/event/types.rs b/src/event/types.rs deleted file mode 100644 index 2324dc1..0000000 --- a/src/event/types.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{ - channel::{self, Channel}, - event::{Instant, Sequence}, - login::Login, - message::{self, Message}, -}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct ChannelEvent { - #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] - pub data: ChannelEventData, -} - -impl ChannelEvent { - pub fn created(channel: Channel) -> Self { - Self { - instant: channel.created, - data: CreatedEvent { channel }.into(), - } - } -} - -impl<'c> From<&'c ChannelEvent> for Sequence { - fn from(event: &'c ChannelEvent) -> Self { - event.instant.sequence - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ChannelEventData { - Created(CreatedEvent), - Message(MessageEvent), - MessageDeleted(MessageDeletedEvent), - Deleted(DeletedEvent), -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct CreatedEvent { - pub channel: Channel, -} - -impl From<CreatedEvent> for ChannelEventData { - fn from(event: CreatedEvent) -> Self { - Self::Created(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageEvent { - pub channel: Channel, - pub sender: Login, - pub message: Message, -} - -impl From<MessageEvent> for ChannelEventData { - fn from(event: MessageEvent) -> Self { - Self::Message(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageDeletedEvent { - pub channel: Channel, - pub message: message::Id, -} - -impl From<MessageDeletedEvent> for ChannelEventData { - fn from(event: MessageDeletedEvent) -> Self { - Self::MessageDeleted(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct DeletedEvent { - pub channel: channel::Id, -} - -impl From<DeletedEvent> for ChannelEventData { - fn from(event: DeletedEvent) -> Self { - Self::Deleted(event) - } -} diff --git a/src/expire.rs b/src/expire.rs index a8eb8ad..e50bcb4 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -14,7 +14,7 @@ pub async fn middleware( next: Next, ) -> Result<Response, Internal> { app.tokens().expire(&expired_at).await?; - app.events().expire(&expired_at).await?; + app.messages().expire(&expired_at).await?; app.channels().expire(&expired_at).await?; Ok(next.run(req).await) } diff --git a/src/message/app.rs b/src/message/app.rs new file mode 100644 index 0000000..51f772e --- /dev/null +++ b/src/message/app.rs @@ -0,0 +1,88 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{repo::Provider as _, Message}; +use crate::{ + channel::{self, repo::Provider as _}, + clock::DateTime, + db::NotFound as _, + event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + login::Login, +}; + +pub struct Messages<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Messages<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn send( + &self, + channel: &channel::Id, + sender: &Login, + sent_at: &DateTime, + body: &str, + ) -> Result<Message, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::ChannelNotFound(channel.clone()))?; + let sent = tx.sequence().next(sent_at).await?; + let message = tx + .messages() + .create(&channel.snapshot(), sender, &sent, body) + .await?; + tx.commit().await?; + + for event in message.events() { + self.events.broadcast(event); + } + + Ok(message.snapshot()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.messages().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let deleted = tx.sequence().next(relative_to).await?; + let message = tx.messages().delete(&channel, &message, &deleted).await?; + events.push( + message + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + for event in events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + { + self.events.broadcast(event); + } + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} diff --git a/src/message/event.rs b/src/message/event.rs new file mode 100644 index 0000000..bcc2238 --- /dev/null +++ b/src/message/event.rs @@ -0,0 +1,50 @@ +use super::{snapshot::Message, Id}; +use crate::{ + channel::Channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Sent(Sent), + Deleted(Deleted), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Sent { + #[serde(flatten)] + pub message: Message, +} + +impl From<Sent> for Kind { + fn from(event: Sent) -> Self { + Self::Sent(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + pub channel: Channel, + pub message: Id, +} + +impl From<Deleted> for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/message/history.rs b/src/message/history.rs new file mode 100644 index 0000000..5aca47e --- /dev/null +++ b/src/message/history.rs @@ -0,0 +1,43 @@ +use super::{ + event::{Deleted, Event, Sent}, + Message, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub message: Message, + pub sent: Instant, + pub deleted: Option<Instant>, +} + +impl History { + fn sent(&self) -> Event { + Event { + instant: self.sent, + kind: Sent { + message: self.message.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| Event { + instant, + kind: Deleted { + channel: self.message.channel.clone(), + message: self.message.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator<Item = Event> { + [self.sent()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Message { + self.message.clone() + } +} diff --git a/src/message/mod.rs b/src/message/mod.rs index 9a9bf14..52d56c1 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,9 +1,8 @@ +pub mod app; +pub mod event; +mod history; mod id; +pub mod repo; +mod snapshot; -pub use self::id::Id; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Message { - pub id: Id, - pub body: String, -} +pub use self::{event::Event, history::History, id::Id, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs new file mode 100644 index 0000000..3b2b8f7 --- /dev/null +++ b/src/message/repo.rs @@ -0,0 +1,214 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use super::{snapshot::Message, History, Id}; +use crate::{ + channel::{self, Channel}, + clock::DateTime, + event::{Instant, Sequence}, + login::{self, Login}, +}; + +pub trait Provider { + fn messages(&mut self) -> Messages; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn messages(&mut self) -> Messages { + Messages(self) + } +} + +pub struct Messages<'t>(&'t mut SqliteConnection); + +impl<'c> Messages<'c> { + pub async fn create( + &mut self, + channel: &Channel, + sender: &Login, + sent: &Instant, + body: &str, + ) -> Result<History, sqlx::Error> { + let id = Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sender, sent_at, sent_sequence, body) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: Id", + body + "#, + id, + channel.id, + sender.id, + sent.at, + sent.sequence, + body, + ) + .map(|row| History { + message: Message { + channel: channel.clone(), + sender: sender.clone(), + id: row.id, + body: row.body, + }, + sent: *sent, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result<History, sqlx::Error> { + let message = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where message.id = $1 + and message.channel = $2 + "#, + message, + channel.id, + ) + .map(|row| History { + message: Message { + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn delete( + &mut self, + channel: &Channel, + message: &Id, + deleted: &Instant, + ) -> Result<History, sqlx::Error> { + let history = self.by_id(channel, message).await?; + + sqlx::query_scalar!( + r#" + delete from message + where + id = $1 + returning 1 as "deleted: i64" + "#, + history.message.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(History { + deleted: Some(*deleted), + ..history + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result<Vec<(Channel, Id)>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + message.id as "message: Id" + from message + join channel on message.channel = channel.id + where sent_at < $1 + "#, + expire_at, + ) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn replay( + &mut self, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where coalesce(message.sent_sequence > $1, true) + "#, + resume_at, + ) + .map(|row| History { + message: Message { + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs new file mode 100644 index 0000000..3adccbe --- /dev/null +++ b/src/message/snapshot.rs @@ -0,0 +1,74 @@ +use super::{ + event::{Event, Kind, Sent}, + Id, +}; +use crate::{channel::Channel, login::Login}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(into = "self::serialize::Message")] +pub struct Message { + pub channel: Channel, + pub sender: Login, + pub id: Id, + pub body: String, +} + +mod serialize { + use crate::{channel::Channel, login::Login, message::Id}; + + #[derive(serde::Serialize)] + pub struct Message { + channel: Channel, + sender: Login, + #[allow(clippy::struct_field_names)] + // Deliberately redundant with the module path; this produces a specific serialization. + message: MessageData, + } + + #[derive(serde::Serialize)] + pub struct MessageData { + id: Id, + body: String, + } + + impl From<super::Message> for Message { + fn from(message: super::Message) -> Self { + Self { + channel: message.channel, + sender: message.sender, + message: MessageData { + id: message.id, + body: message.body, + }, + } + } + } +} + +impl Message { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event.kind) { + (None, Kind::Sent(event)) => Some(event.into()), + (Some(message), Kind::Deleted(event)) if message.id == event.message => None, + (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Message> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Message::apply) + } +} + +impl From<&Sent> for Message { + fn from(event: &Sent) -> Self { + event.message.clone() + } +} + +impl From<Sent> for Message { + fn from(event: Sent) -> Self { + event.message + } +} diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs new file mode 100644 index 0000000..09f0490 --- /dev/null +++ b/src/test/fixtures/event.rs @@ -0,0 +1,11 @@ +use crate::{ + event::{Event, Kind}, + message::Message, +}; + +pub fn message_sent(event: &Event, message: &Message) -> bool { + matches!( + &event.kind, + Kind::MessageSent(event) if message == &event.into() + ) +} diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index d1939a5..6e62aea 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -1,11 +1,11 @@ use futures::future; -use crate::event::types; +use crate::event::{Event, Kind}; -pub fn messages() -> impl FnMut(&types::ChannelEvent) -> future::Ready<bool> { - |event| future::ready(matches!(event.data, types::ChannelEventData::Message(_))) +pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> { + |event| future::ready(matches!(event.kind, Kind::MessageSent(_))) } -pub fn created() -> impl FnMut(&types::ChannelEvent) -> future::Ready<bool> { - |event| future::ready(matches!(event.data, types::ChannelEventData::Created(_))) +pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> { + |event| future::ready(matches!(event.kind, Kind::ChannelCreated(_))) } diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index fd50887..381b10b 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -1,17 +1,12 @@ use faker_rand::lorem::Paragraphs; -use crate::{app::App, channel::Channel, clock::RequestedAt, event::types, login::Login}; +use crate::{app::App, channel::Channel, clock::RequestedAt, login::Login, message::Message}; -pub async fn send( - app: &App, - login: &Login, - channel: &Channel, - sent_at: &RequestedAt, -) -> types::ChannelEvent { +pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &RequestedAt) -> Message { let body = propose(); - app.events() - .send(login, &channel.id, &body, sent_at) + app.messages() + .send(&channel.id, login, sent_at, &body) .await .expect("should succeed if the channel exists") } diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 76467ab..c5efa9b 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; pub mod channel; +pub mod event; pub mod filter; pub mod future; pub mod identity; diff --git a/src/token/app.rs b/src/token/app.rs index 030ec69..5c4fcd5 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -127,7 +127,7 @@ impl<'a> Tokens<'a> { tx.commit().await?; for event in tokens.into_iter().map(event::TokenRevoked::from) { - self.tokens.broadcast(&event); + self.tokens.broadcast(event); } Ok(()) @@ -139,7 +139,7 @@ impl<'a> Tokens<'a> { tx.commit().await?; self.tokens - .broadcast(&event::TokenRevoked::from(token.clone())); + .broadcast(event::TokenRevoked::from(token.clone())); Ok(()) } |
