summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/broadcast.rs43
-rw-r--r--src/conversation/app.rs87
-rw-r--r--src/conversation/history.rs44
-rw-r--r--src/conversation/repo.rs138
-rw-r--r--src/event/sequence.rs12
-rw-r--r--src/login/app.rs7
-rw-r--r--src/message/app.rs83
-rw-r--r--src/message/handlers/delete/mod.rs7
-rw-r--r--src/message/handlers/delete/test.rs18
-rw-r--r--src/message/history.rs70
-rw-r--r--src/message/repo.rs148
-rw-r--r--src/token/app.rs7
-rw-r--r--src/user/create.rs39
-rw-r--r--src/user/history.rs22
-rw-r--r--src/user/repo.rs50
15 files changed, 458 insertions, 317 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs
index 6e1f04d..dae7641 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -1,16 +1,11 @@
-use std::sync::{Arc, Mutex};
-
use futures::{Stream, future, stream::StreamExt as _};
use tokio::sync::broadcast::{Sender, channel};
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
-// Clones will share the same sender.
+// Clones will share the same channel.
#[derive(Clone)]
pub struct Broadcaster<M> {
- // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
- // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
- // lock it must be sync.
- senders: Arc<Mutex<Sender<M>>>,
+ sender: Sender<M>,
}
impl<M> Default for Broadcaster<M>
@@ -20,9 +15,7 @@ where
fn default() -> Self {
let sender = Self::make_sender();
- Self {
- senders: Arc::new(Mutex::new(sender)),
- }
+ Self { sender }
}
}
@@ -30,9 +23,7 @@ impl<M> Broadcaster<M>
where
M: Clone + Send + std::fmt::Debug + 'static,
{
- pub fn broadcast(&self, message: impl Into<M>) {
- let tx = self.sender();
-
+ pub fn broadcast(&self, message: M) {
// Per the Tokio docs, the returned error is only used to indicate that
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
@@ -40,11 +31,29 @@ where
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message.into());
+ let _ = self.sender.send(message);
+ }
+
+ // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an
+ // iterable of items that can be collected into `M`, then this will construct an `M` from the
+ // passed event iterator, converting each element as it goes. This emits one message (as `M`),
+ // containing whatever we collect out of `messages`.
+ //
+ // This is mostly meant for handling synchronized entity events, which tend to be generated as
+ // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec<event::Event>`
+ // for consumption by outside clients.
+ pub fn broadcast_from<I, E>(&self, messages: I)
+ where
+ I: IntoIterator,
+ M: FromIterator<E>,
+ E: From<I::Item>,
+ {
+ let message = messages.into_iter().map(Into::into).collect();
+ self.broadcast(message);
}
pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> {
- let rx = self.sender().subscribe();
+ let rx = self.sender.subscribe();
BroadcastStream::from(rx).scan((), |(), r| {
// The following could technically be `r.ok()`, and is exactly
@@ -65,10 +74,6 @@ where
})
}
- fn sender(&self) -> Sender<M> {
- self.senders.lock().unwrap().clone()
- }
-
fn make_sender() -> Sender<M> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 81ccdcf..26886af 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -3,15 +3,15 @@ use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use super::{
- Conversation, Id,
+ Conversation, History, Id, history,
repo::{LoadError, Provider as _},
validate,
};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
+ message::repo::Provider as _,
name::{self, Name},
};
@@ -36,15 +36,20 @@ impl<'a> Conversations<'a> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let conversation = tx
- .conversations()
- .create(name, &created)
+ let conversation = History::begin(name, created);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = conversation.events().filter(Sequence::start_from(created));
+ tx.conversations()
+ .record_events(events.clone())
.await
.duplicate(|| CreateError::DuplicateName(name.clone()))?;
+
tx.commit().await?;
- self.events
- .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(conversation.as_created())
}
@@ -78,33 +83,28 @@ impl<'a> Conversations<'a> {
.by_id(conversation)
.await
.not_found(|| DeleteError::NotFound(conversation.clone()))?;
- conversation
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
-
- let mut events = Vec::new();
let messages = tx.messages().live(&conversation).await?;
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+
let has_messages = messages
.iter()
- .map(message::History::as_snapshot)
+ .map(|message| message.as_of(deleted_at))
.any(|message| message.is_some());
if has_messages {
return Err(DeleteError::NotEmpty(conversation.id().clone()));
}
- let deleted = tx.sequence().next(deleted_at).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.extend(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let conversation = conversation.delete(deleted_at)?;
+
+ let events = conversation
+ .events()
+ .filter(Sequence::start_from(deleted_at));
+ tx.conversations().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(events);
+ self.events.broadcast_from(events);
Ok(())
}
@@ -120,23 +120,20 @@ impl<'a> Conversations<'a> {
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.push(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations()
+ .record_events(conversation_events.clone())
+ .await?;
+
+ events.push(conversation_events);
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -218,8 +215,18 @@ impl From<LoadError> for DeleteError {
}
}
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
+ #[error("tried to expire already-deleted conversation: {0}")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -234,3 +241,11 @@ impl From<LoadError> for ExpireError {
}
}
}
+
+impl From<history::DeleteError> for ExpireError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
diff --git a/src/conversation/history.rs b/src/conversation/history.rs
index 746a1b0..5cba9ca 100644
--- a/src/conversation/history.rs
+++ b/src/conversation/history.rs
@@ -4,13 +4,49 @@ use super::{
Conversation, Id,
event::{Created, Deleted, Event},
};
-use crate::event::Sequence;
+use crate::{
+ event::{Instant, Sequence},
+ name::Name,
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub conversation: Conversation,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(name: &Name, created: Instant) -> Self {
+ Self {
+ conversation: Conversation {
+ id: Id::generate(),
+ name: name.clone(),
+ created,
+ deleted: None,
+ },
+ }
+ }
+
+ pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> {
+ if self.conversation.deleted.is_none() {
+ Ok(Self {
+ conversation: Conversation {
+ deleted: Some(deleted),
+ ..self.conversation
+ },
+ })
+ } else {
+ Err(DeleteError::Deleted(self))
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("conversation {} already deleted", .0.conversation.id)]
+ Deleted(History),
+}
+
// State interface
impl History {
pub fn id(&self) -> &Id {
@@ -30,9 +66,7 @@ impl History {
where
S: Into<Sequence>,
{
- self.events()
- .filter(Sequence::up_to(sequence.into()))
- .collect()
+ self.events().filter(Sequence::up_to(sequence)).collect()
}
// Snapshot of this conversation as of all events recorded in this history.
@@ -43,7 +77,7 @@ impl History {
// Event factories
impl History {
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
[self.created()]
.into_iter()
.merge_by(self.deleted(), Sequence::merge)
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
index 7e38b62..cb66bf8 100644
--- a/src/conversation/repo.rs
+++ b/src/conversation/repo.rs
@@ -1,9 +1,12 @@
use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+use super::{
+ Conversation, Event, History, Id,
+ event::{Created, Deleted},
+};
use crate::{
clock::DateTime,
- conversation::{Conversation, History, Id},
db::NotFound,
event::{Instant, Sequence},
name::{self, Name},
@@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Conversations<'t>(&'t mut SqliteConnection);
impl Conversations<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
+ pub async fn record_events(
+ &mut self,
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Created(created) => self.record_created(created).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> {
+ let Conversation {
+ id,
+ created,
+ name,
+ deleted: _,
+ } = &created.conversation;
let display_name = name.display();
let canonical_name = name.canonical();
- let created = *created;
sqlx::query!(
r#"
insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
+ values ($1, $2, $3, $3)
"#,
id,
created.at,
created.sequence,
- created.sequence,
)
.execute(&mut *self.0)
.await?;
@@ -54,16 +76,50 @@ impl Conversations<'_> {
.execute(&mut *self.0)
.await?;
- let conversation = History {
- conversation: Conversation {
- created,
- id,
- name: name.clone(),
- deleted: None,
- },
- };
+ Ok(())
+ }
- Ok(conversation)
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
@@ -179,56 +235,6 @@ impl Conversations<'_> {
Ok(conversations)
}
- pub async fn delete(
- &mut self,
- conversation: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = conversation.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at conversations via the API, cannot retrieve
- // the "deleted" conversation's information by ignoring the deletion event.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let conversation = self.by_id(id).await?;
-
- Ok(conversation)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let conversations = sqlx::query_scalar!(
r#"
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index 77281c2..9a0ea5d 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -50,24 +50,30 @@ impl fmt::Display for Sequence {
}
impl Sequence {
- pub fn up_to<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn up_to<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone
where
+ P: Into<Self>,
E: Sequenced,
{
+ let resume_point = resume_point.into();
move |event| event.sequence() <= resume_point
}
- pub fn after<E>(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn after<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone
where
+ P: Into<Self>,
E: Sequenced,
{
+ let resume_point = resume_point.into();
move |event| resume_point < event.sequence()
}
- pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn start_from<P, E>(resume_point: P) -> impl for<'e> Fn(&'e E) -> bool + Clone
where
+ P: Into<Self>,
E: Sequenced,
{
+ let resume_point = resume_point.into();
move |event| resume_point <= event.sequence()
}
diff --git a/src/login/app.rs b/src/login/app.rs
index 77d4ac3..e471000 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -80,9 +80,10 @@ impl<'a> Logins<'a> {
tx.tokens().create(&token, &secret).await?;
tx.commit().await?;
- for event in revoked.into_iter().map(TokenEvent::Revoked) {
- self.token_events.broadcast(event);
- }
+ revoked
+ .into_iter()
+ .map(TokenEvent::Revoked)
+ .for_each(|event| self.token_events.broadcast(event));
Ok(secret)
} else {
diff --git a/src/message/app.rs b/src/message/app.rs
index 9100224..647152e 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -2,12 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{Body, Id, Message, repo::Provider as _};
+use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
clock::DateTime,
conversation::{self, repo::Provider as _},
db::NotFound as _,
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
name,
user::{self, repo::Provider as _},
@@ -52,14 +52,17 @@ impl<'a> Messages<'a> {
let sent = tx.sequence().next(sent_at).await?;
let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
- let message = tx
- .messages()
- .create(&conversation, &sender, &sent, body)
- .await?;
+ let message = History::begin(&conversation, &sender, body, sent);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = message.events().filter(Sequence::start_from(sent));
+ tx.messages().record_events(events.clone()).await?;
+
tx.commit().await?;
- self.events
- .broadcast(message.events().map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(message.as_sent())
}
@@ -71,38 +74,24 @@ impl<'a> Messages<'a> {
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
let message_not_found = || DeleteError::MessageNotFound(message.clone());
- let message_deleted = || DeleteError::Deleted(message.clone());
- let deleter_not_found = || DeleteError::UserNotFound(deleted_by.id.clone().into());
- let deleter_deleted = || DeleteError::UserDeleted(deleted_by.id.clone().into());
let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
let mut tx = self.db.begin().await?;
+
let message = tx
.messages()
.by_id(message)
.await
.not_found(message_not_found)?;
- let deleted_by = tx
- .users()
- .by_login(deleted_by)
- .await
- .not_found(deleter_not_found)?;
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let message = message.as_of(deleted).ok_or_else(message_deleted)?;
- let deleted_by = deleted_by.as_of(deleted).ok_or_else(deleter_deleted)?;
+ if message.sender() == &deleted_by.id {
+ let deleted_at = tx.sequence().next(deleted_at).await?;
+ let message = message.delete(deleted_at)?;
- if message.sender == deleted_by.id {
- let message = tx.messages().delete(&message, &deleted).await?;
+ let events = message.events().filter(Sequence::start_from(deleted_at));
+ tx.messages().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(
- message
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events.broadcast_from(events);
Ok(())
} else {
@@ -120,25 +109,23 @@ impl<'a> Messages<'a> {
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- if let Some(message) = message.as_of(deleted) {
- let message = tx.messages().delete(&message, &deleted).await?;
- events.push(
- message
+ match message.delete(deleted) {
+ Ok(message) => {
+ let message_events = message
.events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ .filter(Sequence::start_from(deleted.sequence));
+ tx.messages().record_events(message_events.clone()).await?;
+
+ events.push(message_events);
+ }
+ Err(history::DeleteError::Deleted(_)) => {}
}
}
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
@@ -195,10 +182,6 @@ impl From<user::repo::LoadError> for SendError {
pub enum DeleteError {
#[error("message {0} not found")]
MessageNotFound(Id),
- #[error("user {0} not found")]
- UserNotFound(user::Id),
- #[error("user {0} deleted")]
- UserDeleted(user::Id),
#[error("user {0} not the message's sender")]
NotSender(user::Id),
#[error("message {0} deleted")]
@@ -218,3 +201,11 @@ impl From<user::repo::LoadError> for DeleteError {
}
}
}
+
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()),
+ }
+ }
+}
diff --git a/src/message/handlers/delete/mod.rs b/src/message/handlers/delete/mod.rs
index 606f502..3e9a212 100644
--- a/src/message/handlers/delete/mod.rs
+++ b/src/message/handlers/delete/mod.rs
@@ -51,10 +51,9 @@ impl IntoResponse for Error {
DeleteError::MessageNotFound(_) | DeleteError::Deleted(_) => {
NotFound(error).into_response()
}
- DeleteError::UserNotFound(_)
- | DeleteError::UserDeleted(_)
- | DeleteError::Database(_)
- | DeleteError::Name(_) => Internal::from(error).into_response(),
+ DeleteError::Database(_) | DeleteError::Name(_) => {
+ Internal::from(error).into_response()
+ }
}
}
}
diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs
index d0e1794..05d9344 100644
--- a/src/message/handlers/delete/test.rs
+++ b/src/message/handlers/delete/test.rs
@@ -70,23 +70,23 @@ pub async fn delete_deleted() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let conversation = fixtures::conversation::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await;
+ let message =
+ fixtures::message::send(&app, &conversation, &sender.login, &fixtures::now()).await;
app.messages()
- .delete(&sender, &message.id, &fixtures::now())
+ .delete(&sender.login, &message.id, &fixtures::now())
.await
.expect("deleting a recently-sent message succeeds");
// Send the request
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
let super::Error(error) = super::handler(
State(app.clone()),
Path(message.id.clone()),
fixtures::now(),
- deleter,
+ sender,
)
.await
.expect_err("deleting a deleted message fails");
@@ -101,9 +101,10 @@ pub async fn delete_expired() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::user::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::ancient()).await;
let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await;
+ let message =
+ fixtures::message::send(&app, &conversation, &sender.login, &fixtures::ancient()).await;
app.messages()
.expire(&fixtures::now())
@@ -112,12 +113,11 @@ pub async fn delete_expired() {
// Send the request
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
let super::Error(error) = super::handler(
State(app.clone()),
Path(message.id.clone()),
fixtures::now(),
- deleter,
+ sender,
)
.await
.expect_err("deleting an expired message fails");
diff --git a/src/message/history.rs b/src/message/history.rs
index 2abdf2c..92cecc9 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -1,18 +1,67 @@
use itertools::Itertools as _;
use super::{
- Message,
+ Body, Id, Message,
event::{Deleted, Event, Sent},
};
-use crate::event::Sequence;
+use crate::{
+ conversation::Conversation,
+ event::{Instant, Sequence},
+ user::{self, User},
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub message: Message,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(conversation: &Conversation, sender: &User, body: &Body, sent: Instant) -> Self {
+ Self {
+ message: Message {
+ id: Id::generate(),
+ conversation: conversation.id.clone(),
+ sender: sender.id.clone(),
+ body: body.clone(),
+ sent,
+ deleted: None,
+ },
+ }
+ }
+
+ pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> {
+ if self.message.deleted.is_none() {
+ Ok(Self {
+ message: Message {
+ deleted: Some(deleted),
+ ..self.message
+ },
+ })
+ } else {
+ Err(DeleteError::Deleted(self.into()))
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("message {} already deleted", .0.message.id)]
+ // Payload is boxed here to avoid copying an entire `History` around in any errors this error
+ // gets chained into. See <https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err>.
+ Deleted(Box<History>),
+}
+
// State interface
impl History {
+ pub fn id(&self) -> &Id {
+ &self.message.id
+ }
+
+ pub fn sender(&self) -> &user::Id {
+ &self.message.sender
+ }
+
// Snapshot of this message as it was when sent. (Note to the future: it's okay
// if this returns a redacted or modified version of the message. If we
// implement message editing by redacting the original body, then this should
@@ -30,15 +79,16 @@ impl History {
.filter(Sequence::up_to(sequence.into()))
.collect()
}
-
- // Snapshot of this message as of all events recorded in this history.
- pub fn as_snapshot(&self) -> Option<Message> {
- self.events().collect()
- }
}
// Events interface
impl History {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
+ [self.sent()]
+ .into_iter()
+ .merge_by(self.deleted(), Sequence::merge)
+ }
+
fn sent(&self) -> Event {
Sent {
message: self.message.clone(),
@@ -55,10 +105,4 @@ impl History {
.into()
})
}
-
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
- [self.sent()]
- .into_iter()
- .merge_by(self.deleted(), Sequence::merge)
- }
}
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 83bf0d5..4f66bdc 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -1,11 +1,14 @@
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-use super::{Body, History, Id, snapshot::Message};
+use super::{
+ Body, Event, History, Id, Message,
+ event::{Deleted, Sent},
+};
use crate::{
clock::DateTime,
- conversation::{self, Conversation},
+ conversation,
event::{Instant, Sequence},
- user::{self, User},
+ user,
};
pub trait Provider {
@@ -21,50 +24,84 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Messages<'t>(&'t mut SqliteConnection);
impl Messages<'_> {
- pub async fn create(
+ pub async fn record_events(
&mut self,
- conversation: &Conversation,
- sender: &User,
- sent: &Instant,
- body: &Body,
- ) -> Result<History, sqlx::Error> {
- let id = Id::generate();
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
- let message = sqlx::query!(
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Sent(sent) => self.record_sent(sent).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_sent(&mut self, sent: &Sent) -> Result<(), sqlx::Error> {
+ let Message {
+ id,
+ conversation,
+ sender,
+ body,
+ sent,
+ deleted: _,
+ } = &sent.message;
+
+ sqlx::query!(
r#"
insert into message
- (id, conversation, sender, sent_at, sent_sequence, body, last_sequence)
- values ($1, $2, $3, $4, $5, $6, $7)
- returning
- id as "id: Id",
- conversation as "conversation: conversation::Id",
- sender as "sender: user::Id",
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence",
- body as "body: Body"
+ (id, conversation, sender, body, sent_at, sent_sequence, last_sequence)
+ values ($1, $2, $3, $4, $5, $6, $6)
"#,
id,
- conversation.id,
- sender.id,
- sent.at,
- sent.sequence,
+ conversation,
+ sender,
body,
+ sent.at,
sent.sequence,
)
- .map(|row| History {
- message: Message {
- sent: Instant::new(row.sent_at, row.sent_sequence),
- conversation: row.conversation,
- sender: row.sender,
- id: row.id,
- body: row.body.unwrap_or_default(),
- deleted: None,
- },
- })
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
- Ok(message)
+ Ok(())
+ }
+
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+
+ sqlx::query!(
+ r#"
+ insert into message_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a message is deleted, its body is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at messages in the conversation, cannot retrieve the
+ // "deleted" message by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ update message
+ set body = '', last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn live(
@@ -178,45 +215,6 @@ impl Messages<'_> {
Ok(message)
}
- pub async fn delete(
- &mut self,
- message: &Message,
- deleted: &Instant,
- ) -> Result<History, sqlx::Error> {
- sqlx::query!(
- r#"
- insert into message_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- message.id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a message is deleted, its body is
- // retconned to have been the empty string. Someone reading the event stream
- // afterwards, or looking at messages in the conversation, cannot retrieve the
- // "deleted" message by ignoring the deletion event.
- sqlx::query!(
- r#"
- update message
- set body = '', last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- message.id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- let message = self.by_id(&message.id).await?;
-
- Ok(message)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let messages = sqlx::query_scalar!(
r#"
diff --git a/src/token/app.rs b/src/token/app.rs
index fb5d712..1d68f32 100644
--- a/src/token/app.rs
+++ b/src/token/app.rs
@@ -102,9 +102,10 @@ impl<'a> Tokens<'a> {
let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
- for event in tokens.into_iter().map(TokenEvent::Revoked) {
- self.token_events.broadcast(event);
- }
+ tokens
+ .into_iter()
+ .map(TokenEvent::Revoked)
+ .for_each(|event| self.token_events.broadcast(event));
Ok(())
}
diff --git a/src/user/create.rs b/src/user/create.rs
index 5c060c9..d6656e5 100644
--- a/src/user/create.rs
+++ b/src/user/create.rs
@@ -3,7 +3,7 @@ use sqlx::{Transaction, sqlite::Sqlite};
use super::{History, repo::Provider as _, validate};
use crate::{
clock::DateTime,
- event::{Broadcaster, Event, repo::Provider as _},
+ event::{Broadcaster, Event, Sequence, repo::Provider as _},
login::{self, Login, repo::Provider as _},
name::Name,
password::{Password, StoredHash},
@@ -54,7 +54,10 @@ pub struct Validated<'a> {
}
impl Validated<'_> {
- pub async fn store(self, tx: &mut Transaction<'_, Sqlite>) -> Result<Stored, sqlx::Error> {
+ pub async fn store(
+ self,
+ tx: &mut Transaction<'_, Sqlite>,
+ ) -> Result<Stored<impl IntoIterator<Item = Event> + use<>>, sqlx::Error> {
let Self {
name,
password,
@@ -63,28 +66,40 @@ impl Validated<'_> {
let login = Login {
id: login::Id::generate(),
- name: name.to_owned(),
+ name: name.clone(),
};
+ tx.logins().create(&login, &password).await?;
let created = tx.sequence().next(created_at).await?;
- tx.logins().create(&login, &password).await?;
- let user = tx.users().create(&login, &created).await?;
+ let user = History::begin(&login, created);
+
+ let events = user.events().filter(Sequence::start_from(created));
+ tx.users().record_events(events.clone()).await?;
- Ok(Stored { user, login })
+ Ok(Stored {
+ events: events.map(Event::from),
+ login,
+ })
}
}
#[must_use = "dropping a user creation attempt is likely a mistake"]
-pub struct Stored {
- user: History,
+pub struct Stored<E> {
+ events: E,
login: Login,
}
-impl Stored {
- pub fn publish(self, broadcaster: &Broadcaster) {
- let Self { user, login: _ } = self;
+impl<E> Stored<E>
+where
+ E: IntoIterator<Item = Event>,
+{
+ pub fn publish(self, events: &Broadcaster) {
+ let Self {
+ events: user_events,
+ login: _,
+ } = self;
- broadcaster.broadcast(user.events().map(Event::from).collect::<Vec<_>>());
+ events.broadcast_from(user_events);
}
pub fn login(&self) -> &Login {
diff --git a/src/user/history.rs b/src/user/history.rs
index f58e9c7..7c06a2d 100644
--- a/src/user/history.rs
+++ b/src/user/history.rs
@@ -2,7 +2,10 @@ use super::{
User,
event::{Created, Event},
};
-use crate::event::{Instant, Sequence};
+use crate::{
+ event::{Instant, Sequence},
+ login::Login,
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -10,6 +13,21 @@ pub struct History {
pub created: Instant,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(login: &Login, created: Instant) -> Self {
+ let Login { id, name } = login.clone();
+
+ Self {
+ user: User {
+ id: id.into(),
+ name,
+ },
+ created,
+ }
+ }
+}
+
// State interface
impl History {
pub fn as_of<S>(&self, sequence: S) -> Option<User>
@@ -32,7 +50,7 @@ impl History {
.into()
}
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
[self.created()].into_iter()
}
}
diff --git a/src/user/repo.rs b/src/user/repo.rs
index aaf3b73..292d72e 100644
--- a/src/user/repo.rs
+++ b/src/user/repo.rs
@@ -1,13 +1,13 @@
use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+use super::{Event, History, Id, User, event::Created};
use crate::{
clock::DateTime,
db::NotFound,
event::{Instant, Sequence},
login::Login,
name::{self, Name},
- user::{History, Id, User},
};
pub trait Provider {
@@ -23,30 +23,39 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Users<'t>(&'t mut SqliteConnection);
impl Users<'_> {
- pub async fn create(
+ pub async fn record_events(
&mut self,
- login: &Login,
- created: &Instant,
- ) -> Result<History, sqlx::Error> {
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Created(created) => self.record_created(created).await,
+ }
+ }
+
+ async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> {
+ let Created { user, instant } = created;
+
sqlx::query!(
r#"
- insert into user (id, created_sequence, created_at)
+ insert
+ into user (id, created_at, created_sequence)
values ($1, $2, $3)
"#,
- login.id,
- created.sequence,
- created.at,
+ user.id,
+ instant.at,
+ instant.sequence,
)
.execute(&mut *self.0)
.await?;
- Ok(History {
- user: User {
- id: login.id.clone().into(),
- name: login.name.clone(),
- },
- created: *created,
- })
+ Ok(())
}
pub async fn by_login(&mut self, login: &Login) -> Result<History, LoadError> {
@@ -86,12 +95,11 @@ impl Users<'_> {
id as "id: Id",
login.display_name as "display_name: String",
login.canonical_name as "canonical_name: String",
- user.created_sequence as "created_sequence: Sequence",
- user.created_at as "created_at: DateTime"
+ user.created_at as "created_at: DateTime",
+ user.created_sequence as "created_sequence: Sequence"
from user
join login using (id)
where user.created_sequence <= $1
- order by canonical_name
"#,
resume_at,
)
@@ -119,8 +127,8 @@ impl Users<'_> {
id as "id: Id",
login.display_name as "display_name: String",
login.canonical_name as "canonical_name: String",
- user.created_sequence as "created_sequence: Sequence",
- user.created_at as "created_at: DateTime"
+ user.created_at as "created_at: DateTime",
+ user.created_sequence as "created_sequence: Sequence"
from user
join login using (id)
where user.created_sequence > $1