diff options
Diffstat (limited to 'src/message/repo.rs')
| -rw-r--r-- | src/message/repo.rs | 148 |
1 files changed, 73 insertions, 75 deletions
diff --git a/src/message/repo.rs b/src/message/repo.rs index 83bf0d5..4f66bdc 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -1,11 +1,14 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; -use super::{Body, History, Id, snapshot::Message}; +use super::{ + Body, Event, History, Id, Message, + event::{Deleted, Sent}, +}; use crate::{ clock::DateTime, - conversation::{self, Conversation}, + conversation, event::{Instant, Sequence}, - user::{self, User}, + user, }; pub trait Provider { @@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> { pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { - pub async fn create( + pub async fn record_events( &mut self, - conversation: &Conversation, - sender: &User, - sent: &Instant, - body: &Body, - ) -> Result<History, sqlx::Error> { - let id = Id::generate(); + events: impl IntoIterator<Item = Event>, + ) -> Result<(), sqlx::Error> { + for event in events { + self.record_event(&event).await?; + } + Ok(()) + } - let message = sqlx::query!( + pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { + match event { + Event::Sent(sent) => self.record_sent(sent).await, + Event::Deleted(deleted) => self.record_deleted(deleted).await, + } + } + + async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> { + let Message { + id, + conversation, + sender, + body, + sent, + deleted: _, + } = &sent.message; + + sqlx::query!( r#" insert into message - (id, conversation, sender, sent_at, sent_sequence, body, last_sequence) - values ($1, $2, $3, $4, $5, $6, $7) - returning - id as "id: Id", - conversation as "conversation: conversation::Id", - sender as "sender: user::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body as "body: Body" + (id, conversation, sender, body, sent_at, sent_sequence, last_sequence) + values ($1, $2, $3, $4, $5, $6, $6) "#, id, - conversation.id, - sender.id, - sent.at, - sent.sequence, + conversation, + sender, body, + sent.at, sent.sequence, ) - .map(|row| History { - message: Message { - sent: Instant::new(row.sent_at, row.sent_sequence), - conversation: row.conversation, - sender: row.sender, - id: row.id, - body: row.body.unwrap_or_default(), - deleted: None, - }, - }) - .fetch_one(&mut *self.0) + .execute(&mut *self.0) .await?; - Ok(message) + Ok(()) + } + + async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> { + let Deleted { instant, id } = deleted; + + sqlx::query!( + r#" + insert into message_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + "#, + id, + instant.at, + instant.sequence, + ) + .execute(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a message is deleted, its body is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at messages in the conversation, cannot retrieve the + // "deleted" message by ignoring the deletion event. + sqlx::query!( + r#" + update message + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + "#, + instant.sequence, + id, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) } pub async fn live( @@ -178,45 +215,6 @@ impl Messages<'_> { Ok(message) } - pub async fn delete( - &mut self, - message: &Message, - deleted: &Instant, - ) -> Result<History, sqlx::Error> { - sqlx::query!( - r#" - insert into message_deleted (id, deleted_at, deleted_sequence) - values ($1, $2, $3) - "#, - message.id, - deleted.at, - deleted.sequence, - ) - .execute(&mut *self.0) - .await?; - - // Small social responsibility hack here: when a message is deleted, its body is - // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the conversation, cannot retrieve the - // "deleted" message by ignoring the deletion event. - sqlx::query!( - r#" - update message - set body = '', last_sequence = max(last_sequence, $1) - where id = $2 - returning id as "id: Id" - "#, - deleted.sequence, - message.id, - ) - .fetch_one(&mut *self.0) - .await?; - - let message = self.by_id(&message.id).await?; - - Ok(message) - } - pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let messages = sqlx::query_scalar!( r#" |
