diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/broadcast.rs | 43 | ||||
| -rw-r--r-- | src/conversation/app.rs | 87 | ||||
| -rw-r--r-- | src/conversation/history.rs | 44 | ||||
| -rw-r--r-- | src/conversation/repo.rs | 138 | ||||
| -rw-r--r-- | src/event/sequence.rs | 12 | ||||
| -rw-r--r-- | src/login/app.rs | 7 | ||||
| -rw-r--r-- | src/message/app.rs | 83 | ||||
| -rw-r--r-- | src/message/handlers/delete/mod.rs | 7 | ||||
| -rw-r--r-- | src/message/handlers/delete/test.rs | 18 | ||||
| -rw-r--r-- | src/message/history.rs | 70 | ||||
| -rw-r--r-- | src/message/repo.rs | 148 | ||||
| -rw-r--r-- | src/token/app.rs | 7 | ||||
| -rw-r--r-- | src/user/create.rs | 39 | ||||
| -rw-r--r-- | src/user/history.rs | 22 | ||||
| -rw-r--r-- | src/user/repo.rs | 50 |
15 files changed, 458 insertions, 317 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..dae7641 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,16 +1,11 @@ -use std::sync::{Arc, Mutex}; - use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; -// Clones will share the same sender. +// Clones will share the same channel. #[derive(Clone)] pub struct Broadcaster<M> { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that - // lock it must be sync. - senders: Arc<Mutex<Sender<M>>>, + sender: Sender<M>, } impl<M> Default for Broadcaster<M> @@ -20,9 +15,7 @@ where fn default() -> Self { let sender = Self::make_sender(); - Self { - senders: Arc::new(Mutex::new(sender)), - } + Self { sender } } } @@ -30,9 +23,7 @@ impl<M> Broadcaster<M> where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into<M>) { - let tx = self.sender(); - + pub fn broadcast(&self, message: M) { // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an @@ -40,11 +31,29 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = self.sender.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec<event::Event>` + // for consumption by outside clients. + pub fn broadcast_from<I, E>(&self, messages: I) + where + I: IntoIterator, + M: FromIterator<E>, + E: From<I::Item>, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> { - let rx = self.sender().subscribe(); + let rx = self.sender.subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly @@ -65,10 +74,6 @@ where }) } - fn sender(&self) -> Sender<M> { - self.senders.lock().unwrap().clone() - } - fn make_sender() -> Sender<M> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 81ccdcf..26886af 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -3,15 +3,15 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Conversation, Id, + Conversation, History, Id, history, repo::{LoadError, Provider as _}, validate, }; use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, - message::{self, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, + message::repo::Provider as _, name::{self, Name}, }; @@ -36,15 +36,20 @@ impl<'a> Conversations<'a> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let conversation = tx - .conversations() - .create(name, &created) + let conversation = History::begin(name, created); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = conversation.events().filter(Sequence::start_from(created)); + tx.conversations() + .record_events(events.clone()) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; + tx.commit().await?; - self.events - .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(conversation.as_created()) } @@ -78,33 +83,28 @@ impl<'a> Conversations<'a> { .by_id(conversation) .await .not_found(|| DeleteError::NotFound(conversation.clone()))?; - conversation - .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; - - let mut events = Vec::new(); let messages = tx.messages().live(&conversation).await?; + let deleted_at = tx.sequence().next(deleted_at).await?; + let has_messages = messages .iter() - .map(message::History::as_snapshot) + .map(|message| message.as_of(deleted_at)) .any(|message| message.is_some()); if has_messages { return Err(DeleteError::NotEmpty(conversation.id().clone())); } - let deleted = tx.sequence().next(deleted_at).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.extend( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let conversation = conversation.delete(deleted_at)?; + + let events = conversation + .events() + .filter(Sequence::start_from(deleted_at)); + tx.conversations().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast(events); + self.events.broadcast_from(events); Ok(()) } @@ -120,23 +120,20 @@ impl<'a> Conversations<'a> { let mut events = Vec::with_capacity(expired.len()); for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let conversation = tx.conversations().delete(&conversation, &deleted).await?; - events.push( - conversation - .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + let conversation = conversation.delete(deleted)?; + + let conversation_events = conversation.events().filter(Sequence::start_from(deleted)); + tx.conversations() + .record_events(conversation_events.clone()) + .await?; + + events.push(conversation_events); } tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } @@ -218,8 +215,18 @@ impl From<LoadError> for DeleteError { } } +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { + #[error("tried to expire already-deleted conversation: {0}")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] @@ -234,3 +241,11 @@ impl From<LoadError> for ExpireError { } } } + +impl From<history::DeleteError> for ExpireError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()), + } + } +} diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 746a1b0..5cba9ca 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -4,13 +4,49 @@ use super::{ Conversation, Id, event::{Created, Deleted, Event}, }; -use crate::event::Sequence; +use crate::{ + event::{Instant, Sequence}, + name::Name, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub conversation: Conversation, } +// Lifecycle interface +impl History { + pub fn begin(name: &Name, created: Instant) -> Self { + Self { + conversation: Conversation { + id: Id::generate(), + name: name.clone(), + created, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> { + if self.conversation.deleted.is_none() { + Ok(Self { + conversation: Conversation { + deleted: Some(deleted), + ..self.conversation + }, + }) + } else { + Err(DeleteError::Deleted(self)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("conversation {} already deleted", .0.conversation.id)] + Deleted(History), +} + // State interface impl History { pub fn id(&self) -> &Id { @@ -30,9 +66,7 @@ impl History { where S: Into<Sequence>, { - self.events() - .filter(Sequence::up_to(sequence.into())) - .collect() + self.events().filter(Sequence::up_to(sequence)).collect() } // Snapshot of this conversation as of all events recorded in this history. @@ -43,7 +77,7 @@ impl History { // Event factories impl History { - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { [self.created()] .into_iter() .merge_by(self.deleted(), Sequence::merge) diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs index 7e38b62..cb66bf8 100644 --- a/src/conversation/repo.rs +++ b/src/conversation/repo.rs @@ -1,9 +1,12 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{ + Conversation, Event, History, Id, + event::{Created, Deleted}, +}; use crate::{ clock::DateTime, - conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, @@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Conversations<'t>(&'t mut SqliteConnection); impl Conversations<'_> { - pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { - let id = Id::generate(); - let name = name.clone(); + pub async fn record_events( + &mut self, + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Conversation { + id, + created, + name, + deleted: _, + } = &created.conversation; let display_name = name.display(); let canonical_name = name.canonical(); - let created = *created; sqlx::query!( r#" insert into conversation (id, created_at, created_sequence, last_sequence) - values ($1, $2, $3, $4) + values ($1, $2, $3, $3) "#, id, created.at, created.sequence, - created.sequence, ) .execute(&mut *self.0) .await?; @@ -54,16 +76,50 @@ impl Conversations<'_> { .execute(&mut *self.0) .await?; - let conversation = History { - conversation: Conversation { - created, - id, - name: name.clone(), - deleted: None, - }, - }; + Ok(()) + } - Ok(conversation) + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + sqlx::query!( + r#" + update conversation + set last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + sqlx::query!( + r#" + insert into conversation_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a conversation is deleted, its + // name is retconned to have been the empty string. Someone reading the event + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. + sqlx::query!( + r#" + delete from conversation_name + where id = $1 + "#, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { @@ -179,56 +235,6 @@ impl Conversations<'_> { Ok(conversations) } - pub async fn delete( - &mut self, - conversation: &History, - deleted: &Instant, - ) -> Result<History, LoadError> { - let id = conversation.id(); - sqlx::query!( - r#" - update conversation - set last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - id, - ) - .fetch_one(&mut *self.0) - .await?; - - sqlx::query!( - r#" - insert into conversation_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a conversation is deleted, its - // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at conversations via the API, cannot retrieve - // the "deleted" conversation's information by ignoring the deletion event. - sqlx::query!( - r#" - delete from conversation_name - where id = $1 - "#, - id, - ) - .execute(&mut *self.0) - .await?; - - let conversation = self.by_id(id).await?; - - Ok(conversation) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let conversations = sqlx::query_scalar!( r#" diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 77281c2..9a0ea5d 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -50,24 +50,30 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into<Self>, E: Sequenced, { + let resume_point = resume_point.into(); move |event| event.sequence() <= resume_point } - pub fn after<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into<Self>, E: Sequenced, { + let resume_point = resume_point.into(); move |event| resume_point < event.sequence() } - pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool + pub fn start_from<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone where + P: Into<Self>, E: Sequenced, { + let resume_point = resume_point.into(); move |event| resume_point <= event.sequence() } diff --git a/src/login/app.rs b/src/login/app.rs index 77d4ac3..e471000 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -80,9 +80,10 @@ impl<'a> Logins<'a> { tx.tokens().create(&token, &secret).await?; tx.commit().await?; - for event in revoked.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + revoked + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(secret) } else { diff --git a/src/message/app.rs b/src/message/app.rs index 9100224..647152e 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,12 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{Body, Id, Message, repo::Provider as _}; +use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, @@ -52,14 +52,17 @@ impl<'a> Messages<'a> { let sent = tx.sequence().next(sent_at).await?; let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = tx - .messages() - .create(&conversation, &sender, &sent, body) - .await?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages().record_events(events.clone()).await?; + tx.commit().await?; - self.events - .broadcast(message.events().map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(message.as_sent()) } @@ -71,38 +74,24 @@ impl<'a> Messages<'a> { deleted_at: &DateTime, ) -> Result<(), DeleteError> { let message_not_found = || DeleteError::MessageNotFound(message.clone()); - let message_deleted = || DeleteError::Deleted(message.clone()); - let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into()); - let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into()); let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into()); let mut tx = self.db.begin().await?; + let message = tx .messages() .by_id(message) .await .not_found(message_not_found)?; - let deleted_by = tx - .users() - .by_login(deleted_by) - .await - .not_found(deleter_not_found)?; - - let deleted = tx.sequence().next(deleted_at).await?; - let message = message.as_of(deleted).ok_or_else(message_deleted)?; - let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?; + if message.sender() == &deleted_by.id { + let deleted_at = tx.sequence().next(deleted_at).await?; + let message = message.delete(deleted_at)?; - if message.sender == deleted_by.id { - let message = tx.messages().delete(&message, &deleted).await?; + let events = message.events().filter(Sequence::start_from(deleted_at)); + tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events.broadcast( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events.broadcast_from(events); Ok(()) } else { @@ -120,25 +109,23 @@ impl<'a> Messages<'a> { let mut events = Vec::with_capacity(expired.len()); for message in expired { let deleted = tx.sequence().next(relative_to).await?; - if let Some(message) = message.as_of(deleted) { - let message = tx.messages().delete(&message, &deleted).await?; - events.push( - message + match message.delete(deleted) { + Ok(message) => { + let message_events = message .events() - .filter(Sequence::start_from(deleted.sequence)), - ); + .filter(Sequence::start_from(deleted.sequence)); + tx.messages().record_events(message_events.clone()).await?; + + events.push(message_events); + } + Err(history::DeleteError::Deleted(_)) => {} } } tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } @@ -195,10 +182,6 @@ impl From<user::repo::LoadError> for SendError { pub enum DeleteError { #[error("message {0} not found")] MessageNotFound(Id), - #[error("user {0} not found")] - UserNotFound(user::Id), - #[error("user {0} deleted")] - UserDeleted(user::Id), #[error("user {0} not the message's sender")] NotSender(user::Id), #[error("message {0} deleted")] @@ -218,3 +201,11 @@ impl From<user::repo::LoadError> for DeleteError { } } } + +impl From<history::DeleteError> for DeleteError { + fn from(error: history::DeleteError) -> Self { + match error { + history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()), + } + } +} diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs index 606f502..3e9a212 100644 --- a/src/message/handlers/delete/mod.rs +++ b/src/message/handlers/delete/mod.rs @@ -51,10 +51,9 @@ impl IntoResponse for Error { DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => { NotFound(error).into_response() } - DeleteError::UserNotFound(_) - | DeleteError::UserDeleted(_) - | DeleteError::Database(_) - | DeleteError::Name(_) => Internal::from(error).into_response(), + DeleteError::Database(_) | DeleteError::Name(_) => { + Internal::from(error).into_response() + } } } } diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index d0e1794..05d9344 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -70,23 +70,23 @@ pub async fn delete_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::now()).await; + let sender = fixtures::identity::create(&app, &fixtures::now()).await; let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await; app.messages() - .delete(&sender, &message.id, &fixtures::now()) + .delete(&sender.login, &message.id, &fixtures::now()) .await .expect("deleting a recently-sent message succeeds"); // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting a deleted message fails"); @@ -101,9 +101,10 @@ pub async fn delete_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::user::create(&app, &fixtures::ancient()).await; + let sender = fixtures::identity::create(&app, &fixtures::ancient()).await; let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -112,12 +113,11 @@ pub async fn delete_expired() { // Send the request - let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), Path(message.id.clone()), fixtures::now(), - deleter, + sender, ) .await .expect_err("deleting an expired message fails"); diff --git a/src/message/history.rs b/src/message/history.rs index 2abdf2c..92cecc9 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -1,18 +1,67 @@ use itertools::Itertools as _; use super::{ - Message, + Body, Id, Message, event::{Deleted, Event, Sent}, }; -use crate::event::Sequence; +use crate::{ + conversation::Conversation, + event::{Instant, Sequence}, + user::{self, User}, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, } +// Lifecycle interface +impl History { + pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self { + Self { + message: Message { + id: Id::generate(), + conversation: conversation.id.clone(), + sender: sender.id.clone(), + body: body.clone(), + sent, + deleted: None, + }, + } + } + + pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> { + if self.message.deleted.is_none() { + Ok(Self { + message: Message { + deleted: Some(deleted), + ..self.message + }, + }) + } else { + Err(DeleteError::Deleted(self.into())) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("message {} already deleted", .0.message.id)] + // Payload is boxed here to avoid copying an entire `History` around in any errors this error + // gets chained into. See <https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err>. + Deleted(Box<History>), +} + // State interface impl History { + pub fn id(&self) -> &Id { + &self.message.id + } + + pub fn sender(&self) -> &user::Id { + &self.message.sender + } + // Snapshot of this message as it was when sent. (Note to the future: it's okay // if this returns a redacted or modified version of the message. If we // implement message editing by redacting the original body, then this should @@ -30,15 +79,16 @@ impl History { .filter(Sequence::up_to(sequence.into())) .collect() } - - // Snapshot of this message as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option<Message> { - self.events().collect() - } } // Events interface impl History { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { + [self.sent()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) + } + fn sent(&self) -> Event { Sent { message: self.message.clone(), @@ -55,10 +105,4 @@ impl History { .into() }) } - - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { - [self.sent()] - .into_iter() - .merge_by(self.deleted(), Sequence::merge) - } } diff --git a/src/message/repo.rs b/src/message/repo.rs index 83bf0d5..4f66bdc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,11 +1,14 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; -use super::{Body, History, Id, snapshot::Message}; +use super::{ + Body, Event, History, Id, Message, + event::{Deleted, Sent}, +}; use crate::{ clock::DateTime, - conversation::{self, Conversation}, + conversation, event::{Instant, Sequence}, - user::{self, User}, + user, }; pub trait Provider { @@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { - pub async fn create( + pub async fn record_events( &mut self, - conversation: &Conversation, - sender: &User, - sent: &Instant, - body: &Body, - ) -> Result<History, sqlx::Error> { - let id = Id::generate(); + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } - let message = sqlx::query!( + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Sent(sent) => self.record_sent(sent).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { + let Message { + id, + conversation, + sender, + body, + sent, + deleted: _, + } = &sent.message; + + sqlx::query!( r#" insert into message - (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) - values ($1, $2, $3, $4, $5, $6, $7) - returning - id as "id: Id", - conversation as "conversation: conversation::Id", - sender as "sender: user::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body as "body: Body" + (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) + values ($1, $2, $3, $4, $5, $6, $6) "#, id, - conversation.id, - sender.id, - sent.at, - sent.sequence, + conversation, + sender, body, + sent.at, sent.sequence, ) - .map(|row| History { - message: Message { - sent: Instant::new(row.sent_at, row.sent_sequence), - conversation: row.conversation, - sender: row.sender, - id: row.id, - body: row.body.unwrap_or_default(), - deleted: None, - }, - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(message) + Ok(()) + } + + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + + sqlx::query!( + r#" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the conversation, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn live( @@ -178,45 +215,6 @@ impl Messages<'_> { Ok(message) } - pub async fn delete( - &mut self, - message: &Message, - deleted: &Instant, - ) -> Result<History, sqlx::Error> { - sqlx::query!( - r#" - insert into message_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - message.id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a message is deleted, its body is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the conversation, cannot retrieve the - // "deleted" message by ignoring the deletion event. - sqlx::query!( - r#" - update message - set body = '', last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - message.id, - ) - .fetch_one(&mut *self.0) - .await?; - - let message = self.by_id(&message.id).await?; - - Ok(message) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" diff --git a/src/token/app.rs b/src/token/app.rs index fb5d712..1d68f32 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -102,9 +102,10 @@ impl<'a> Tokens<'a> { let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; - for event in tokens.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + tokens + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(()) } diff --git a/src/user/create.rs b/src/user/create.rs index 5c060c9..d6656e5 100644 --- a/src/user/create.rs +++ b/src/user/create.rs @@ -3,7 +3,7 @@ use sqlx::{Transaction, sqlite::Sqlite}; use super::{History, repo::Provider as _, validate}; use crate::{ clock::DateTime, - event::{Broadcaster, Event, repo::Provider as _}, + event::{Broadcaster, Event, Sequence, repo::Provider as _}, login::{self, Login, repo::Provider as _}, name::Name, password::{Password, StoredHash}, @@ -54,7 +54,10 @@ pub struct Validated<'a> { } impl Validated<'_> { - pub async fn store(self, tx: &mut Transaction<'_, Sqlite>) -> Result<Stored, sqlx::Error> { + pub async fn store( + self, + tx: &mut Transaction<'_, Sqlite>, + ) -> Result<Stored<impl IntoIterator<Item = Event> + use<>>, sqlx::Error> { let Self { name, password, @@ -63,28 +66,40 @@ impl Validated<'_> { let login = Login { id: login::Id::generate(), - name: name.to_owned(), + name: name.clone(), }; + tx.logins().create(&login, &password).await?; let created = tx.sequence().next(created_at).await?; - tx.logins().create(&login, &password).await?; - let user = tx.users().create(&login, &created).await?; + let user = History::begin(&login, created); + + let events = user.events().filter(Sequence::start_from(created)); + tx.users().record_events(events.clone()).await?; - Ok(Stored { user, login }) + Ok(Stored { + events: events.map(Event::from), + login, + }) } } #[must_use = "dropping a user creation attempt is likely a mistake"] -pub struct Stored { - user: History, +pub struct Stored<E> { + events: E, login: Login, } -impl Stored { - pub fn publish(self, broadcaster: &Broadcaster) { - let Self { user, login: _ } = self; +impl<E> Stored<E> +where + E: IntoIterator<Item = Event>, +{ + pub fn publish(self, events: &Broadcaster) { + let Self { + events: user_events, + login: _, + } = self; - broadcaster.broadcast(user.events().map(Event::from).collect::<Vec<_>>()); + events.broadcast_from(user_events); } pub fn login(&self) -> &Login { diff --git a/src/user/history.rs b/src/user/history.rs index f58e9c7..7c06a2d 100644 --- a/src/user/history.rs +++ b/src/user/history.rs @@ -2,7 +2,10 @@ use super::{ User, event::{Created, Event}, }; -use crate::event::{Instant, Sequence}; +use crate::{ + event::{Instant, Sequence}, + login::Login, +}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -10,6 +13,21 @@ pub struct History { pub created: Instant, } +// Lifecycle interface +impl History { + pub fn begin(login: &Login, created: Instant) -> Self { + let Login { id, name } = login.clone(); + + Self { + user: User { + id: id.into(), + name, + }, + created, + } + } +} + // State interface impl History { pub fn as_of<S>(&self, sequence: S) -> Option<User> @@ -32,7 +50,7 @@ impl History { .into() } - pub fn events(&self) -> impl Iterator<Item = Event> + use<> { + pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> { [self.created()].into_iter() } } diff --git a/src/user/repo.rs b/src/user/repo.rs index aaf3b73..292d72e 100644 --- a/src/user/repo.rs +++ b/src/user/repo.rs @@ -1,13 +1,13 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; +use super::{Event, History, Id, User, event::Created}; use crate::{ clock::DateTime, db::NotFound, event::{Instant, Sequence}, login::Login, name::{self, Name}, - user::{History, Id, User}, }; pub trait Provider { @@ -23,30 +23,39 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Users<'t>(&'t mut SqliteConnection); impl Users<'_> { - pub async fn create( + pub async fn record_events( &mut self, - login: &Login, - created: &Instant, - ) -> Result<History, sqlx::Error> { + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } + + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Created(created) => self.record_created(created).await, + } + } + + async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { + let Created { user, instant } = created; + sqlx::query!( r#" - insert into user (id, created_sequence, created_at) + insert + into user (id, created_at, created_sequence) values ($1, $2, $3) "#, - login.id, - created.sequence, - created.at, + user.id, + instant.at, + instant.sequence, ) .execute(&mut *self.0) .await?; - Ok(History { - user: User { - id: login.id.clone().into(), - name: login.name.clone(), - }, - created: *created, - }) + Ok(()) } pub async fn by_login(&mut self, login: &Login) -> Result<History, LoadError> { @@ -86,12 +95,11 @@ impl Users<'_> { id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", - user.created_sequence as "created_sequence: Sequence", - user.created_at as "created_at: DateTime" + user.created_at as "created_at: DateTime", + user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence <= $1 - order by canonical_name "#, resume_at, ) @@ -119,8 +127,8 @@ impl Users<'_> { id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", - user.created_sequence as "created_sequence: Sequence", - user.created_at as "created_at: DateTime" + user.created_at as "created_at: DateTime", + user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence > $1 |
