summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 12:25:36 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 19:25:41 -0400
commitec804134c33aedb001c426c5f42f43f53c47848f (patch)
treec62b59ab5cdd438f47a5f9cc35fdc712d362af19 /src
parent469613872f6fb19f4579b387e19b2bc38fa52f51 (diff)
Represent channels and messages using a split "History" and "Snapshot" model.
This separates the code that figures out what happened to an entity from the code that represents it to a user, and makes it easier to compute a snapshot at a point in time (for things like bootstrap). It also makes the internal logic a bit easier to follow, since it's easier to tell whether you're working with a point in time or with the whole recorded history. This hefty.
Diffstat (limited to 'src')
-rw-r--r--src/app.rs5
-rw-r--r--src/broadcast.rs4
-rw-r--r--src/channel/app.rs35
-rw-r--r--src/channel/event.rs48
-rw-r--r--src/channel/history.rs42
-rw-r--r--src/channel/mod.rs15
-rw-r--r--src/channel/repo.rs94
-rw-r--r--src/channel/routes.rs11
-rw-r--r--src/channel/routes/test/on_create.rs6
-rw-r--r--src/channel/routes/test/on_send.rs13
-rw-r--r--src/channel/snapshot.rs38
-rw-r--r--src/event/app.rs122
-rw-r--r--src/event/broadcaster.rs4
-rw-r--r--src/event/mod.rs73
-rw-r--r--src/event/repo.rs (renamed from src/event/repo/sequence.rs)0
-rw-r--r--src/event/repo/message.rs196
-rw-r--r--src/event/repo/mod.rs4
-rw-r--r--src/event/routes.rs14
-rw-r--r--src/event/routes/test.rs98
-rw-r--r--src/event/sequence.rs59
-rw-r--r--src/event/types.rs85
-rw-r--r--src/expire.rs2
-rw-r--r--src/message/app.rs88
-rw-r--r--src/message/event.rs50
-rw-r--r--src/message/history.rs43
-rw-r--r--src/message/mod.rs13
-rw-r--r--src/message/repo.rs214
-rw-r--r--src/message/snapshot.rs74
-rw-r--r--src/test/fixtures/event.rs11
-rw-r--r--src/test/fixtures/filter.rs10
-rw-r--r--src/test/fixtures/message.rs13
-rw-r--r--src/test/fixtures/mod.rs1
-rw-r--r--src/token/app.rs4
33 files changed, 956 insertions, 533 deletions
diff --git a/src/app.rs b/src/app.rs
index 5542e5f..186e5f8 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -4,6 +4,7 @@ use crate::{
channel::app::Channels,
event::{app::Events, broadcaster::Broadcaster as EventBroadcaster},
login::app::Logins,
+ message::app::Messages,
token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster},
};
@@ -35,6 +36,10 @@ impl App {
Logins::new(&self.db)
}
+ pub const fn messages(&self) -> Messages {
+ Messages::new(&self.db, &self.events)
+ }
+
pub const fn tokens(&self) -> Tokens {
Tokens::new(&self.db, &self.tokens)
}
diff --git a/src/broadcast.rs b/src/broadcast.rs
index 083a301..bedc263 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -32,7 +32,7 @@ where
{
// panic: if ``message.channel.id`` has not been previously registered,
// and was not part of the initial set of channels.
- pub fn broadcast(&self, message: &M) {
+ pub fn broadcast(&self, message: impl Into<M>) {
let tx = self.sender();
// Per the Tokio docs, the returned error is only used to indicate that
@@ -42,7 +42,7 @@ where
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message.clone());
+ let _ = tx.send(message.into());
}
// panic: if ``channel`` has not been previously registered, and was not
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b7e3a10..6ce826b 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,11 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use crate::{
channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
};
pub struct Channels<'a> {
@@ -27,10 +28,11 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ for event in channel.events() {
+ self.events.broadcast(event);
+ }
- Ok(channel)
+ Ok(channel.snapshot())
}
pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
@@ -38,6 +40,16 @@ impl<'a> Channels<'a> {
let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
@@ -51,14 +63,21 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let event = tx.channels().delete(&channel, &deleted).await?;
- events.push(event);
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
+ for event in events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ {
+ self.events.broadcast(event);
}
Ok(())
diff --git a/src/channel/event.rs b/src/channel/event.rs
new file mode 100644
index 0000000..9c54174
--- /dev/null
+++ b/src/channel/event.rs
@@ -0,0 +1,48 @@
+use super::Channel;
+use crate::{
+ channel,
+ event::{Instant, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ Created(Created),
+ Deleted(Deleted),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Created {
+ pub channel: Channel,
+}
+
+impl From<Created> for Kind {
+ fn from(event: Created) -> Self {
+ Self::Created(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Deleted {
+ pub channel: channel::Id,
+}
+
+impl From<Deleted> for Kind {
+ fn from(event: Deleted) -> Self {
+ Self::Deleted(event)
+ }
+}
diff --git a/src/channel/history.rs b/src/channel/history.rs
new file mode 100644
index 0000000..3cc7d9d
--- /dev/null
+++ b/src/channel/history.rs
@@ -0,0 +1,42 @@
+use super::{
+ event::{Created, Deleted, Event},
+ Channel,
+};
+use crate::event::Instant;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub channel: Channel,
+ pub created: Instant,
+ pub deleted: Option<Instant>,
+}
+
+impl History {
+ fn created(&self) -> Event {
+ Event {
+ instant: self.created,
+ kind: Created {
+ channel: self.channel.clone(),
+ }
+ .into(),
+ }
+ }
+
+ fn deleted(&self) -> Option<Event> {
+ self.deleted.map(|instant| Event {
+ instant,
+ kind: Deleted {
+ channel: self.channel.id.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub fn events(&self) -> impl Iterator<Item = Event> {
+ [self.created()].into_iter().chain(self.deleted())
+ }
+
+ pub fn snapshot(&self) -> Channel {
+ self.channel.clone()
+ }
+}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index 4baa7e3..eb8200b 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,16 +1,9 @@
-use crate::event::Instant;
-
pub mod app;
+pub mod event;
+mod history;
mod id;
pub mod repo;
mod routes;
+mod snapshot;
-pub use self::{id::Id, routes::router};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Channel {
- pub id: Id,
- pub name: String,
- #[serde(skip)]
- pub created: Instant,
-}
+pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index c000b56..8bb761b 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -1,9 +1,9 @@
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
- channel::{Channel, Id},
+ channel::{Channel, History, Id},
clock::DateTime,
- event::{types, Instant, Sequence},
+ event::{Instant, Sequence},
};
pub trait Provider {
@@ -19,7 +19,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
impl<'c> Channels<'c> {
- pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> {
+ pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
let id = Id::generate();
let channel = sqlx::query!(
r#"
@@ -37,13 +37,16 @@ impl<'c> Channels<'c> {
created.at,
created.sequence,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_one(&mut *self.0)
.await?;
@@ -51,7 +54,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> {
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
let channel = sqlx::query!(
r#"
select
@@ -64,13 +67,16 @@ impl<'c> Channels<'c> {
"#,
channel,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_one(&mut *self.0)
.await?;
@@ -81,7 +87,7 @@ impl<'c> Channels<'c> {
pub async fn all(
&mut self,
resume_point: Option<Sequence>,
- ) -> Result<Vec<Channel>, sqlx::Error> {
+ ) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -95,13 +101,16 @@ impl<'c> Channels<'c> {
"#,
resume_point,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_all(&mut *self.0)
.await?;
@@ -112,7 +121,7 @@ impl<'c> Channels<'c> {
pub async fn replay(
&mut self,
resume_at: Option<Sequence>,
- ) -> Result<Vec<Channel>, sqlx::Error> {
+ ) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -125,13 +134,16 @@ impl<'c> Channels<'c> {
"#,
resume_at,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_all(&mut *self.0)
.await?;
@@ -141,35 +153,43 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Channel,
+ channel: &Id,
deleted: &Instant,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- let channel = channel.id.clone();
- sqlx::query_scalar!(
+ ) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
r#"
delete from channel
where id = $1
- returning 1 as "row: i64"
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
"#,
channel,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: Some(*deleted),
+ })
.fetch_one(&mut *self.0)
.await?;
- Ok(types::ChannelEvent {
- instant: *deleted,
- data: types::DeletedEvent { channel }.into(),
- })
+ Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
- let channels = sqlx::query!(
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ let channels = sqlx::query_scalar!(
r#"
select
- channel.id as "id: Id",
- channel.name,
- channel.created_at as "created_at: DateTime",
- channel.created_sequence as "created_sequence: Sequence"
+ channel.id as "id: Id"
from channel
left join message
where created_at < $1
@@ -177,14 +197,6 @@ impl<'c> Channels<'c> {
"#,
expired_at,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 5d8b61e..5bb1ee9 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -13,8 +13,9 @@ use crate::{
channel::{self, Channel},
clock::RequestedAt,
error::Internal,
- event::{app::EventsError, Sequence},
+ event::Sequence,
login::Login,
+ message::app::Error as MessageError,
};
#[cfg(test)]
@@ -99,8 +100,8 @@ async fn on_send(
login: Login,
Json(request): Json<SendRequest>,
) -> Result<StatusCode, ErrorResponse> {
- app.events()
- .send(&login, &channel, &request.message, &sent_at)
+ app.messages()
+ .send(&channel, &login, &sent_at, &request.message)
.await
// Could impl `From` here, but it's more code and this is used once.
.map_err(ErrorResponse)?;
@@ -109,13 +110,13 @@ async fn on_send(
}
#[derive(Debug)]
-struct ErrorResponse(EventsError);
+struct ErrorResponse(MessageError);
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ EventsError::ChannelNotFound(_) => {
+ not_found @ MessageError::ChannelNotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 9988932..5733c9e 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -3,7 +3,7 @@ use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
- event::types,
+ event,
test::fixtures::{self, future::Immediately as _},
};
@@ -50,8 +50,8 @@ async fn new_channel() {
.expect("creation event published");
assert!(matches!(
- event.data,
- types::ChannelEventData::Created(event)
+ event.kind,
+ event::Kind::ChannelCreated(event)
if event.channel == response_channel
));
}
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 33ec3b7..1027b29 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -4,7 +4,8 @@ use futures::stream::StreamExt;
use crate::{
channel,
channel::routes,
- event::{app, types},
+ event,
+ message::app,
test::fixtures::{self, future::Immediately as _},
};
@@ -54,10 +55,10 @@ async fn messages_in_order() {
for ((sent_at, message), event) in requests.into_iter().zip(events) {
assert_eq!(*sent_at, event.instant.at);
assert!(matches!(
- event.data,
- types::ChannelEventData::Message(event_message)
- if event_message.sender == sender
- && event_message.message.body == message
+ event.kind,
+ event::Kind::MessageSent(event)
+ if event.message.sender == sender
+ && event.message.body == message
));
}
}
@@ -90,6 +91,6 @@ async fn nonexistent_channel() {
assert!(matches!(
error,
- app::EventsError::ChannelNotFound(error_channel) if channel == error_channel
+ app::Error::ChannelNotFound(error_channel) if channel == error_channel
));
}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
new file mode 100644
index 0000000..6462f25
--- /dev/null
+++ b/src/channel/snapshot.rs
@@ -0,0 +1,38 @@
+use super::{
+ event::{Created, Event, Kind},
+ Id,
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Channel {
+ pub id: Id,
+ pub name: String,
+}
+
+impl Channel {
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event.kind) {
+ (None, Kind::Created(event)) => Some(event.into()),
+ (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None,
+ (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Channel> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Channel::apply)
+ }
+}
+
+impl From<&Created> for Channel {
+ fn from(event: &Created) -> Self {
+ event.channel.clone()
+ }
+}
+
+impl From<Created> for Channel {
+ fn from(event: Created) -> Self {
+ event.channel
+ }
+}
diff --git a/src/event/app.rs b/src/event/app.rs
index 5e9e79a..e58bea9 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -1,22 +1,15 @@
-use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
Stream,
};
+use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{
- broadcaster::Broadcaster,
- repo::message::Provider as _,
- types::{self, ChannelEvent},
-};
+use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
- clock::DateTime,
- db::NotFound as _,
- event::{repo::Provider as _, Sequence},
- login::Login,
+ message::{self, repo::Provider as _},
};
pub struct Events<'a> {
@@ -29,111 +22,52 @@ impl<'a> Events<'a> {
Self { db, events }
}
- pub async fn send(
- &self,
- login: &Login,
- channel: &channel::Id,
- body: &str,
- sent_at: &DateTime,
- ) -> Result<types::ChannelEvent, EventsError> {
- let mut tx = self.db.begin().await?;
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let sent = tx.sequence().next(sent_at).await?;
- let event = tx
- .message_events()
- .create(login, &channel, &sent, body)
- .await?;
- tx.commit().await?;
-
- self.events.broadcast(&event);
- Ok(event)
- }
-
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = relative_to.to_owned() - TimeDelta::days(90);
-
- let mut tx = self.db.begin().await?;
- let expired = tx.message_events().expired(&expire_at).await?;
-
- let mut events = Vec::with_capacity(expired.len());
- for (channel, message) in expired {
- let deleted = tx.sequence().next(relative_to).await?;
- let event = tx
- .message_events()
- .delete(&channel, &message, &deleted)
- .await?;
- events.push(event);
- }
-
- tx.commit().await?;
-
- for event in events {
- self.events.broadcast(&event);
- }
-
- Ok(())
- }
-
pub async fn subscribe(
&self,
resume_at: Option<Sequence>,
- ) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> {
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
let mut tx = self.db.begin().await?;
- let channels = tx.channels().replay(resume_at).await?;
+ let channels = tx.channels().replay(resume_at).await?;
let channel_events = channels
- .into_iter()
- .map(ChannelEvent::created)
- .filter(move |event| {
- resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at)
- });
-
- let message_events = tx.message_events().replay(resume_at).await?;
-
- let mut replay_events = channel_events
- .into_iter()
- .chain(message_events.into_iter())
+ .iter()
+ .map(channel::History::events)
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let messages = tx.messages().replay(resume_at).await?;
+ let message_events = messages
+ .iter()
+ .map(message::History::events)
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .filter(Sequence::after(resume_at))
+ .map(Event::from);
+
+ let replay_events = channel_events
+ .merge_by(message_events, |a, b| {
+ a.instant.sequence < b.instant.sequence
+ })
.collect::<Vec<_>>();
- replay_events.sort_by_key(|event| Sequence::from(event));
- let resume_live_at = replay_events.last().map(Sequence::from);
+ let resume_live_at = replay_events.last().map(Sequenced::sequence);
let replay = stream::iter(replay_events);
- // no skip_expired or resume transforms for stored_messages, as it's
- // constructed not to contain messages meeting either criterion.
- //
- // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
- // * resume is redundant with the resume_at argument to
- // `tx.broadcasts().replay(…)`.
let live_messages = live_messages
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
- // stored_messages.
+ // `replay_events`.
.filter(Self::resume(resume_live_at));
Ok(replay.chain(live_messages))
}
- fn resume(
- resume_at: Option<Sequence>,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < Some(Sequence::from(event)))
+ fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ let filter = Sequence::after(resume_at);
+ move |event| future::ready(filter(event))
}
}
-
-#[derive(Debug, thiserror::Error)]
-pub enum EventsError {
- #[error("channel {0} not found")]
- ChannelNotFound(channel::Id),
- #[error(transparent)]
- DatabaseError(#[from] sqlx::Error),
-}
diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs
index 92f631f..de2513a 100644
--- a/src/event/broadcaster.rs
+++ b/src/event/broadcaster.rs
@@ -1,3 +1,3 @@
-use crate::{broadcast, event::types};
+use crate::broadcast;
-pub type Broadcaster = broadcast::Broadcaster<types::ChannelEvent>;
+pub type Broadcaster = broadcast::Broadcaster<super::Event>;
diff --git a/src/event/mod.rs b/src/event/mod.rs
index c982d3a..1503b77 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -1,18 +1,75 @@
+use crate::{channel, message};
+
pub mod app;
pub mod broadcaster;
mod extract;
pub mod repo;
mod routes;
mod sequence;
-pub mod types;
-use crate::clock::DateTime;
+pub use self::{
+ routes::router,
+ sequence::{Instant, Sequence, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+impl From<channel::Event> for Event {
+ fn from(event: channel::Event) -> Self {
+ Self {
+ instant: event.instant,
+ kind: event.kind.into(),
+ }
+ }
+}
+
+impl From<message::Event> for Event {
+ fn from(event: message::Event) -> Self {
+ Self {
+ instant: event.instant,
+ kind: event.kind.into(),
+ }
+ }
+}
-pub use self::{routes::router, sequence::Sequence};
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ #[serde(rename = "created")]
+ ChannelCreated(channel::event::Created),
+ #[serde(rename = "message")]
+ MessageSent(message::event::Sent),
+ MessageDeleted(message::event::Deleted),
+ #[serde(rename = "deleted")]
+ ChannelDeleted(channel::event::Deleted),
+}
+
+impl From<channel::event::Kind> for Kind {
+ fn from(kind: channel::event::Kind) -> Self {
+ match kind {
+ channel::event::Kind::Created(created) => Self::ChannelCreated(created),
+ channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted),
+ }
+ }
+}
-#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Instant {
- pub at: DateTime,
- #[serde(skip)]
- pub sequence: Sequence,
+impl From<message::event::Kind> for Kind {
+ fn from(kind: message::event::Kind) -> Self {
+ match kind {
+ message::event::Kind::Sent(created) => Self::MessageSent(created),
+ message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted),
+ }
+ }
}
diff --git a/src/event/repo/sequence.rs b/src/event/repo.rs
index 40d6a53..40d6a53 100644
--- a/src/event/repo/sequence.rs
+++ b/src/event/repo.rs
diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs
deleted file mode 100644
index f29c8a4..0000000
--- a/src/event/repo/message.rs
+++ /dev/null
@@ -1,196 +0,0 @@
-use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-
-use crate::{
- channel::{self, Channel},
- clock::DateTime,
- event::{types, Instant, Sequence},
- login::{self, Login},
- message::{self, Message},
-};
-
-pub trait Provider {
- fn message_events(&mut self) -> Events;
-}
-
-impl<'c> Provider for Transaction<'c, Sqlite> {
- fn message_events(&mut self) -> Events {
- Events(self)
- }
-}
-
-pub struct Events<'t>(&'t mut SqliteConnection);
-
-impl<'c> Events<'c> {
- pub async fn create(
- &mut self,
- sender: &Login,
- channel: &Channel,
- sent: &Instant,
- body: &str,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- let id = message::Id::generate();
-
- let message = sqlx::query!(
- r#"
- insert into message
- (id, channel, sender, sent_at, sent_sequence, body)
- values ($1, $2, $3, $4, $5, $6)
- returning
- id as "id: message::Id",
- sender as "sender: login::Id",
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence",
- body
- "#,
- id,
- channel.id,
- sender.id,
- sent.at,
- sent.sequence,
- body,
- )
- .map(|row| types::ChannelEvent {
- instant: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
- data: types::MessageEvent {
- channel: channel.clone(),
- sender: sender.clone(),
- message: Message {
- id: row.id,
- body: row.body,
- },
- }
- .into(),
- })
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(message)
- }
-
- pub async fn delete(
- &mut self,
- channel: &Channel,
- message: &message::Id,
- deleted: &Instant,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- sqlx::query_scalar!(
- r#"
- delete from message
- where id = $1
- returning 1 as "row: i64"
- "#,
- message,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(types::ChannelEvent {
- instant: Instant {
- at: deleted.at,
- sequence: deleted.sequence,
- },
- data: types::MessageDeletedEvent {
- channel: channel.clone(),
- message: message.clone(),
- }
- .into(),
- })
- }
-
- pub async fn expired(
- &mut self,
- expire_at: &DateTime,
- ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
- let messages = sqlx::query!(
- r#"
- select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- channel.created_at as "channel_created_at: DateTime",
- channel.created_sequence as "channel_created_sequence: Sequence",
- message.id as "message: message::Id"
- from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where sent_at < $1
- "#,
- expire_at,
- )
- .map(|row| {
- (
- Channel {
- id: row.channel_id,
- name: row.channel_name,
- created: Instant {
- at: row.channel_created_at,
- sequence: row.channel_created_sequence,
- },
- },
- row.message,
- )
- })
- .fetch_all(&mut *self.0)
- .await?;
-
- Ok(messages)
- }
-
- pub async fn replay(
- &mut self,
- resume_at: Option<Sequence>,
- ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> {
- let events = sqlx::query!(
- r#"
- select
- message.id as "id: message::Id",
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- channel.created_at as "channel_created_at: DateTime",
- channel.created_sequence as "channel_created_sequence: Sequence",
- sender.id as "sender_id: login::Id",
- sender.name as sender_name,
- message.sent_at as "sent_at: DateTime",
- message.sent_sequence as "sent_sequence: Sequence",
- message.body
- from message
- join channel on message.channel = channel.id
- join login as sender on message.sender = sender.id
- where coalesce(message.sent_sequence > $1, true)
- order by sent_sequence asc
- "#,
- resume_at,
- )
- .map(|row| types::ChannelEvent {
- instant: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
- data: types::MessageEvent {
- channel: Channel {
- id: row.channel_id,
- name: row.channel_name,
- created: Instant {
- at: row.channel_created_at,
- sequence: row.channel_created_sequence,
- },
- },
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
- },
- message: Message {
- id: row.id,
- body: row.body,
- },
- }
- .into(),
- })
- .fetch_all(&mut *self.0)
- .await?;
-
- Ok(events)
- }
-}
diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs
deleted file mode 100644
index cee840c..0000000
--- a/src/event/repo/mod.rs
+++ /dev/null
@@ -1,4 +0,0 @@
-pub mod message;
-mod sequence;
-
-pub use self::sequence::Provider;
diff --git a/src/event/routes.rs b/src/event/routes.rs
index c87bfb2..5b9c7e3 100644
--- a/src/event/routes.rs
+++ b/src/event/routes.rs
@@ -10,11 +10,11 @@ use axum::{
use axum_extra::extract::Query;
use futures::stream::{Stream, StreamExt as _};
-use super::{extract::LastEventId, types};
+use super::{extract::LastEventId, Event};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::Sequence,
+ event::{Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -35,7 +35,7 @@ async fn events(
identity: Identity,
last_event_id: Option<LastEventId<Sequence>>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug>, EventsError> {
+) -> Result<Events<impl Stream<Item = Event> + std::fmt::Debug>, EventsError> {
let resume_at = last_event_id
.map(LastEventId::into_inner)
.or(query.resume_point);
@@ -51,7 +51,7 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = types::ChannelEvent> + Send + 'static,
+ S: Stream<Item = Event> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
@@ -62,11 +62,11 @@ where
}
}
-impl TryFrom<types::ChannelEvent> for sse::Event {
+impl TryFrom<Event> for sse::Event {
type Error = serde_json::Error;
- fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&Sequence::from(&event))?;
+ fn try_from(event: Event) -> Result<Self, Self::Error> {
+ let id = serde_json::to_string(&event.sequence())?;
let data = serde_json::to_string_pretty(&event)?;
let event = Self::default().id(id).data(data);
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index 68b55cc..ba9953e 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -6,7 +6,7 @@ use futures::{
};
use crate::{
- event::{routes, Sequence},
+ event::{routes, Sequenced as _},
test::fixtures::{self, future::Immediately as _},
};
@@ -17,7 +17,7 @@ async fn includes_historical_message() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
// Call the endpoint
@@ -36,7 +36,7 @@ async fn includes_historical_message() {
.await
.expect("delivered stored message");
- assert_eq!(message, event);
+ assert!(fixtures::event::message_sent(&event, &message));
}
#[tokio::test]
@@ -58,7 +58,7 @@ async fn includes_live_message() {
// Verify the semantics
let sender = fixtures::login::create(&app).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let event = events
.filter(fixtures::filter::messages())
@@ -67,7 +67,7 @@ async fn includes_live_message() {
.await
.expect("delivered live message");
- assert_eq!(message, event);
+ assert!(fixtures::event::message_sent(&event, &message));
}
#[tokio::test]
@@ -87,7 +87,7 @@ async fn includes_multiple_channels() {
let app = app.clone();
let sender = sender.clone();
let channel = channel.clone();
- async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await }
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
})
.collect::<Vec<_>>()
.await;
@@ -110,7 +110,9 @@ async fn includes_multiple_channels() {
.await;
for message in &messages {
- assert!(events.iter().any(|event| { event == message }));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
}
@@ -123,9 +125,9 @@ async fn sequential_messages() {
let sender = fixtures::login::create(&app).await;
let messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
// Call the endpoint
@@ -138,7 +140,13 @@ async fn sequential_messages() {
// Verify the structure of the response.
- let mut events = events.filter(|event| future::ready(messages.contains(event)));
+ let mut events = events.filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
// Verify delivery in order
for message in &messages {
@@ -148,7 +156,7 @@ async fn sequential_messages() {
.await
.expect("undelivered messages remaining");
- assert_eq!(message, &event);
+ assert!(fixtures::event::message_sent(&event, message));
}
}
@@ -160,11 +168,11 @@ async fn resumes_from() {
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
- let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+ let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let later_messages = vec![
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
// Call the endpoint
@@ -190,9 +198,9 @@ async fn resumes_from() {
.await
.expect("delivered events");
- assert_eq!(initial_message, event);
+ assert!(fixtures::event::message_sent(&event, &initial_message));
- Sequence::from(&event)
+ event.sequence()
};
// Resume after disconnect
@@ -214,7 +222,9 @@ async fn resumes_from() {
.await;
for message in &later_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
}
@@ -249,8 +259,8 @@ async fn serial_resume() {
let resume_at = {
let initial_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
];
// First subscription
@@ -271,12 +281,14 @@ async fn serial_resume() {
.await;
for message in &initial_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
let event = events.last().expect("this vec is non-empty");
- Sequence::from(event)
+ event.sequence()
};
// Resume after disconnect
@@ -285,8 +297,8 @@ async fn serial_resume() {
// Note that channel_b does not appear here. The buggy behaviour
// would be masked if channel_b happened to send a new message
// into the resumed event stream.
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
];
// Second subscription
@@ -307,12 +319,14 @@ async fn serial_resume() {
.await;
for message in &resume_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
let event = events.last().expect("this vec is non-empty");
- Sequence::from(event)
+ event.sequence()
};
// Resume after disconnect a second time
@@ -321,8 +335,8 @@ async fn serial_resume() {
// problem. The resume point should before both of these messages, but
// after _all_ prior messages.
let final_messages = [
- fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await,
];
// Third subscription
@@ -345,7 +359,9 @@ async fn serial_resume() {
// This set of messages, in particular, _should not_ include any prior
// messages from `initial_messages` or `resume_messages`.
for message in &final_messages {
- assert!(events.iter().any(|event| event == message));
+ assert!(events
+ .iter()
+ .any(|event| fixtures::event::message_sent(event, message)));
}
};
}
@@ -378,13 +394,17 @@ async fn terminates_on_token_expiry() {
// These should not be delivered.
let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
- .filter(|event| future::ready(messages.contains(event)))
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
.next()
.immediately()
.await
@@ -425,13 +445,17 @@ async fn terminates_on_logout() {
// These should not be delivered.
let messages = [
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
- fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
- .filter(|event| future::ready(messages.contains(event)))
+ .filter(|event| future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message))
+ ))
.next()
.immediately()
.await
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index 9ebddd7..c566156 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,5 +1,20 @@
use std::fmt;
+use crate::clock::DateTime;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Instant {
+ pub at: DateTime,
+ #[serde(skip)]
+ pub sequence: Sequence,
+}
+
+impl From<Instant> for Sequence {
+ fn from(instant: Instant) -> Self {
+ instant.sequence
+ }
+}
+
#[derive(
Clone,
Copy,
@@ -22,3 +37,47 @@ impl fmt::Display for Sequence {
value.fmt(f)
}
}
+
+impl Sequence {
+ pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
+ }
+
+ pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point < Some(event.sequence())
+ }
+
+ pub fn start_from<E>(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool
+ where
+ E: Sequenced,
+ {
+ move |event| resume_point <= event.sequence()
+ }
+}
+
+pub trait Sequenced {
+ fn instant(&self) -> Instant;
+
+ fn sequence(&self) -> Sequence {
+ self.instant().into()
+ }
+}
+
+impl<E> Sequenced for &E
+where
+ E: Sequenced,
+{
+ fn instant(&self) -> Instant {
+ (*self).instant()
+ }
+
+ fn sequence(&self) -> Sequence {
+ (*self).sequence()
+ }
+}
diff --git a/src/event/types.rs b/src/event/types.rs
deleted file mode 100644
index 2324dc1..0000000
--- a/src/event/types.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use crate::{
- channel::{self, Channel},
- event::{Instant, Sequence},
- login::Login,
- message::{self, Message},
-};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct ChannelEvent {
- #[serde(flatten)]
- pub instant: Instant,
- #[serde(flatten)]
- pub data: ChannelEventData,
-}
-
-impl ChannelEvent {
- pub fn created(channel: Channel) -> Self {
- Self {
- instant: channel.created,
- data: CreatedEvent { channel }.into(),
- }
- }
-}
-
-impl<'c> From<&'c ChannelEvent> for Sequence {
- fn from(event: &'c ChannelEvent) -> Self {
- event.instant.sequence
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ChannelEventData {
- Created(CreatedEvent),
- Message(MessageEvent),
- MessageDeleted(MessageDeletedEvent),
- Deleted(DeletedEvent),
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct CreatedEvent {
- pub channel: Channel,
-}
-
-impl From<CreatedEvent> for ChannelEventData {
- fn from(event: CreatedEvent) -> Self {
- Self::Created(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct MessageEvent {
- pub channel: Channel,
- pub sender: Login,
- pub message: Message,
-}
-
-impl From<MessageEvent> for ChannelEventData {
- fn from(event: MessageEvent) -> Self {
- Self::Message(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct MessageDeletedEvent {
- pub channel: Channel,
- pub message: message::Id,
-}
-
-impl From<MessageDeletedEvent> for ChannelEventData {
- fn from(event: MessageDeletedEvent) -> Self {
- Self::MessageDeleted(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct DeletedEvent {
- pub channel: channel::Id,
-}
-
-impl From<DeletedEvent> for ChannelEventData {
- fn from(event: DeletedEvent) -> Self {
- Self::Deleted(event)
- }
-}
diff --git a/src/expire.rs b/src/expire.rs
index a8eb8ad..e50bcb4 100644
--- a/src/expire.rs
+++ b/src/expire.rs
@@ -14,7 +14,7 @@ pub async fn middleware(
next: Next,
) -> Result<Response, Internal> {
app.tokens().expire(&expired_at).await?;
- app.events().expire(&expired_at).await?;
+ app.messages().expire(&expired_at).await?;
app.channels().expire(&expired_at).await?;
Ok(next.run(req).await)
}
diff --git a/src/message/app.rs b/src/message/app.rs
new file mode 100644
index 0000000..51f772e
--- /dev/null
+++ b/src/message/app.rs
@@ -0,0 +1,88 @@
+use chrono::TimeDelta;
+use itertools::Itertools;
+use sqlx::sqlite::SqlitePool;
+
+use super::{repo::Provider as _, Message};
+use crate::{
+ channel::{self, repo::Provider as _},
+ clock::DateTime,
+ db::NotFound as _,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
+ login::Login,
+};
+
+pub struct Messages<'a> {
+ db: &'a SqlitePool,
+ events: &'a Broadcaster,
+}
+
+impl<'a> Messages<'a> {
+ pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
+ Self { db, events }
+ }
+
+ pub async fn send(
+ &self,
+ channel: &channel::Id,
+ sender: &Login,
+ sent_at: &DateTime,
+ body: &str,
+ ) -> Result<Message, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::ChannelNotFound(channel.clone()))?;
+ let sent = tx.sequence().next(sent_at).await?;
+ let message = tx
+ .messages()
+ .create(&channel.snapshot(), sender, &sent, body)
+ .await?;
+ tx.commit().await?;
+
+ for event in message.events() {
+ self.events.broadcast(event);
+ }
+
+ Ok(message.snapshot())
+ }
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.messages().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for (channel, message) in expired {
+ let deleted = tx.sequence().next(relative_to).await?;
+ let message = tx.messages().delete(&channel, &message, &deleted).await?;
+ events.push(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
+ }
+
+ tx.commit().await?;
+
+ for event in events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ {
+ self.events.broadcast(event);
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("channel {0} not found")]
+ ChannelNotFound(channel::Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
diff --git a/src/message/event.rs b/src/message/event.rs
new file mode 100644
index 0000000..bcc2238
--- /dev/null
+++ b/src/message/event.rs
@@ -0,0 +1,50 @@
+use super::{snapshot::Message, Id};
+use crate::{
+ channel::Channel,
+ event::{Instant, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ Sent(Sent),
+ Deleted(Deleted),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Sent {
+ #[serde(flatten)]
+ pub message: Message,
+}
+
+impl From<Sent> for Kind {
+ fn from(event: Sent) -> Self {
+ Self::Sent(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Deleted {
+ pub channel: Channel,
+ pub message: Id,
+}
+
+impl From<Deleted> for Kind {
+ fn from(event: Deleted) -> Self {
+ Self::Deleted(event)
+ }
+}
diff --git a/src/message/history.rs b/src/message/history.rs
new file mode 100644
index 0000000..5aca47e
--- /dev/null
+++ b/src/message/history.rs
@@ -0,0 +1,43 @@
+use super::{
+ event::{Deleted, Event, Sent},
+ Message,
+};
+use crate::event::Instant;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub message: Message,
+ pub sent: Instant,
+ pub deleted: Option<Instant>,
+}
+
+impl History {
+ fn sent(&self) -> Event {
+ Event {
+ instant: self.sent,
+ kind: Sent {
+ message: self.message.clone(),
+ }
+ .into(),
+ }
+ }
+
+ fn deleted(&self) -> Option<Event> {
+ self.deleted.map(|instant| Event {
+ instant,
+ kind: Deleted {
+ channel: self.message.channel.clone(),
+ message: self.message.id.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub fn events(&self) -> impl Iterator<Item = Event> {
+ [self.sent()].into_iter().chain(self.deleted())
+ }
+
+ pub fn snapshot(&self) -> Message {
+ self.message.clone()
+ }
+}
diff --git a/src/message/mod.rs b/src/message/mod.rs
index 9a9bf14..52d56c1 100644
--- a/src/message/mod.rs
+++ b/src/message/mod.rs
@@ -1,9 +1,8 @@
+pub mod app;
+pub mod event;
+mod history;
mod id;
+pub mod repo;
+mod snapshot;
-pub use self::id::Id;
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Message {
- pub id: Id,
- pub body: String,
-}
+pub use self::{event::Event, history::History, id::Id, snapshot::Message};
diff --git a/src/message/repo.rs b/src/message/repo.rs
new file mode 100644
index 0000000..3b2b8f7
--- /dev/null
+++ b/src/message/repo.rs
@@ -0,0 +1,214 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+use super::{snapshot::Message, History, Id};
+use crate::{
+ channel::{self, Channel},
+ clock::DateTime,
+ event::{Instant, Sequence},
+ login::{self, Login},
+};
+
+pub trait Provider {
+ fn messages(&mut self) -> Messages;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn messages(&mut self) -> Messages {
+ Messages(self)
+ }
+}
+
+pub struct Messages<'t>(&'t mut SqliteConnection);
+
+impl<'c> Messages<'c> {
+ pub async fn create(
+ &mut self,
+ channel: &Channel,
+ sender: &Login,
+ sent: &Instant,
+ body: &str,
+ ) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+
+ let message = sqlx::query!(
+ r#"
+ insert into message
+ (id, channel, sender, sent_at, sent_sequence, body)
+ values ($1, $2, $3, $4, $5, $6)
+ returning
+ id as "id: Id",
+ body
+ "#,
+ id,
+ channel.id,
+ sender.id,
+ sent.at,
+ sent.sequence,
+ body,
+ )
+ .map(|row| History {
+ message: Message {
+ channel: channel.clone(),
+ sender: sender.clone(),
+ id: row.id,
+ body: row.body,
+ },
+ sent: *sent,
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(message)
+ }
+
+ async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result<History, sqlx::Error> {
+ let message = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ sender.id as "sender_id: login::Id",
+ sender.name as "sender_name",
+ message.id as "id: Id",
+ message.body,
+ sent_at as "sent_at: DateTime",
+ sent_sequence as "sent_sequence: Sequence"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
+ where message.id = $1
+ and message.channel = $2
+ "#,
+ message,
+ channel.id,
+ )
+ .map(|row| History {
+ message: Message {
+ channel: Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ },
+ sender: Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ id: row.id,
+ body: row.body,
+ },
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(message)
+ }
+
+ pub async fn delete(
+ &mut self,
+ channel: &Channel,
+ message: &Id,
+ deleted: &Instant,
+ ) -> Result<History, sqlx::Error> {
+ let history = self.by_id(channel, message).await?;
+
+ sqlx::query_scalar!(
+ r#"
+ delete from message
+ where
+ id = $1
+ returning 1 as "deleted: i64"
+ "#,
+ history.message.id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(History {
+ deleted: Some(*deleted),
+ ..history
+ })
+ }
+
+ pub async fn expired(
+ &mut self,
+ expire_at: &DateTime,
+ ) -> Result<Vec<(Channel, Id)>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ message.id as "message: Id"
+ from message
+ join channel on message.channel = channel.id
+ where sent_at < $1
+ "#,
+ expire_at,
+ )
+ .map(|row| {
+ (
+ Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ },
+ row.message,
+ )
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(messages)
+ }
+
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ sender.id as "sender_id: login::Id",
+ sender.name as "sender_name",
+ message.id as "id: Id",
+ message.body,
+ sent_at as "sent_at: DateTime",
+ sent_sequence as "sent_sequence: Sequence"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
+ where coalesce(message.sent_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ message: Message {
+ channel: Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ },
+ sender: Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ id: row.id,
+ body: row.body,
+ },
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(messages)
+ }
+}
diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs
new file mode 100644
index 0000000..3adccbe
--- /dev/null
+++ b/src/message/snapshot.rs
@@ -0,0 +1,74 @@
+use super::{
+ event::{Event, Kind, Sent},
+ Id,
+};
+use crate::{channel::Channel, login::Login};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(into = "self::serialize::Message")]
+pub struct Message {
+ pub channel: Channel,
+ pub sender: Login,
+ pub id: Id,
+ pub body: String,
+}
+
+mod serialize {
+ use crate::{channel::Channel, login::Login, message::Id};
+
+ #[derive(serde::Serialize)]
+ pub struct Message {
+ channel: Channel,
+ sender: Login,
+ #[allow(clippy::struct_field_names)]
+ // Deliberately redundant with the module path; this produces a specific serialization.
+ message: MessageData,
+ }
+
+ #[derive(serde::Serialize)]
+ pub struct MessageData {
+ id: Id,
+ body: String,
+ }
+
+ impl From<super::Message> for Message {
+ fn from(message: super::Message) -> Self {
+ Self {
+ channel: message.channel,
+ sender: message.sender,
+ message: MessageData {
+ id: message.id,
+ body: message.body,
+ },
+ }
+ }
+ }
+}
+
+impl Message {
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event.kind) {
+ (None, Kind::Sent(event)) => Some(event.into()),
+ (Some(message), Kind::Deleted(event)) if message.id == event.message => None,
+ (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Message> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Message::apply)
+ }
+}
+
+impl From<&Sent> for Message {
+ fn from(event: &Sent) -> Self {
+ event.message.clone()
+ }
+}
+
+impl From<Sent> for Message {
+ fn from(event: Sent) -> Self {
+ event.message
+ }
+}
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
new file mode 100644
index 0000000..09f0490
--- /dev/null
+++ b/src/test/fixtures/event.rs
@@ -0,0 +1,11 @@
+use crate::{
+ event::{Event, Kind},
+ message::Message,
+};
+
+pub fn message_sent(event: &Event, message: &Message) -> bool {
+ matches!(
+ &event.kind,
+ Kind::MessageSent(event) if message == &event.into()
+ )
+}
diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs
index d1939a5..6e62aea 100644
--- a/src/test/fixtures/filter.rs
+++ b/src/test/fixtures/filter.rs
@@ -1,11 +1,11 @@
use futures::future;
-use crate::event::types;
+use crate::event::{Event, Kind};
-pub fn messages() -> impl FnMut(&types::ChannelEvent) -> future::Ready<bool> {
- |event| future::ready(matches!(event.data, types::ChannelEventData::Message(_)))
+pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> {
+ |event| future::ready(matches!(event.kind, Kind::MessageSent(_)))
}
-pub fn created() -> impl FnMut(&types::ChannelEvent) -> future::Ready<bool> {
- |event| future::ready(matches!(event.data, types::ChannelEventData::Created(_)))
+pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> {
+ |event| future::ready(matches!(event.kind, Kind::ChannelCreated(_)))
}
diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs
index fd50887..381b10b 100644
--- a/src/test/fixtures/message.rs
+++ b/src/test/fixtures/message.rs
@@ -1,17 +1,12 @@
use faker_rand::lorem::Paragraphs;
-use crate::{app::App, channel::Channel, clock::RequestedAt, event::types, login::Login};
+use crate::{app::App, channel::Channel, clock::RequestedAt, login::Login, message::Message};
-pub async fn send(
- app: &App,
- login: &Login,
- channel: &Channel,
- sent_at: &RequestedAt,
-) -> types::ChannelEvent {
+pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &RequestedAt) -> Message {
let body = propose();
- app.events()
- .send(login, &channel.id, &body, sent_at)
+ app.messages()
+ .send(&channel.id, login, sent_at, &body)
.await
.expect("should succeed if the channel exists")
}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 76467ab..c5efa9b 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc};
use crate::{app::App, clock::RequestedAt, db};
pub mod channel;
+pub mod event;
pub mod filter;
pub mod future;
pub mod identity;
diff --git a/src/token/app.rs b/src/token/app.rs
index 030ec69..5c4fcd5 100644
--- a/src/token/app.rs
+++ b/src/token/app.rs
@@ -127,7 +127,7 @@ impl<'a> Tokens<'a> {
tx.commit().await?;
for event in tokens.into_iter().map(event::TokenRevoked::from) {
- self.tokens.broadcast(&event);
+ self.tokens.broadcast(event);
}
Ok(())
@@ -139,7 +139,7 @@ impl<'a> Tokens<'a> {
tx.commit().await?;
self.tokens
- .broadcast(&event::TokenRevoked::from(token.clone()));
+ .broadcast(event::TokenRevoked::from(token.clone()));
Ok(())
}