summaryrefslogtreecommitdiff
path: root/src/message/repo.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message/repo.rs')
-rw-r--r--src/message/repo.rs148
1 files changed, 73 insertions, 75 deletions
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 83bf0d5..4f66bdc 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -1,11 +1,14 @@
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-use super::{Body, History, Id, snapshot::Message};
+use super::{
+ Body, Event, History, Id, Message,
+ event::{Deleted, Sent},
+};
use crate::{
clock::DateTime,
- conversation::{self, Conversation},
+ conversation,
event::{Instant, Sequence},
- user::{self, User},
+ user,
};
pub trait Provider {
@@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
impl Messages<'_> {
- pub async fn create(
+ pub async fn record_events(
&mut self,
- conversation: &Conversation,
- sender: &User,
- sent: &Instant,
- body: &Body,
- ) -> Result<History, sqlx::Error> {
- let id = Id::generate();
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
- let message = sqlx::query!(
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Sent(sent) => self.record_sent(sent).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> {
+ let Message {
+ id,
+ conversation,
+ sender,
+ body,
+ sent,
+ deleted: _,
+ } = &sent.message;
+
+ sqlx::query!(
r#"
insert into message
- (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)
- values ($1, $2, $3, $4, $5, $6, $7)
- returning
- id as "id: Id",
- conversation as "conversation: conversation::Id",
- sender as "sender: user::Id",
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence",
- body as "body: Body"
+ (id, conversation, sender, body, sent_at, sent_sequence, last_sequence)
+ values ($1, $2, $3, $4, $5, $6, $6)
"#,
id,
- conversation.id,
- sender.id,
- sent.at,
- sent.sequence,
+ conversation,
+ sender,
body,
+ sent.at,
sent.sequence,
)
- .map(|row| History {
- message: Message {
- sent: Instant::new(row.sent_at, row.sent_sequence),
- conversation: row.conversation,
- sender: row.sender,
- id: row.id,
- body: row.body.unwrap_or_default(),
- deleted: None,
- },
- })
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
- Ok(message)
+ Ok(())
+ }
+
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+
+ sqlx::query!(
+ r#"
+ insert into message_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a message is deleted, its body is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at messages in the conversation, cannot retrieve the
+ // "deleted" message by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ update message
+ set body = '', last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn live(
@@ -178,45 +215,6 @@ impl Messages<'_> {
Ok(message)
}
- pub async fn delete(
- &mut self,
- message: &Message,
- deleted: &Instant,
- ) -> Result<History, sqlx::Error> {
- sqlx::query!(
- r#"
- insert into message_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- message.id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a message is deleted, its body is
- // retconned to have been the empty string. Someone reading the event stream
- // afterwards, or looking at messages in the conversation, cannot retrieve the
- // "deleted" message by ignoring the deletion event.
- sqlx::query!(
- r#"
- update message
- set body = '', last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- message.id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- let message = self.by_id(&message.id).await?;
-
- Ok(message)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let messages = sqlx::query_scalar!(
r#"