summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 12:25:36 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 19:25:41 -0400
commitec804134c33aedb001c426c5f42f43f53c47848f (patch)
treec62b59ab5cdd438f47a5f9cc35fdc712d362af19 /src/channel
parent469613872f6fb19f4579b387e19b2bc38fa52f51 (diff)
Represent channels and messages using a split "History" and "Snapshot" model.
This separates the code that figures out what happened to an entity from the code that represents it to a user, and makes it easier to compute a snapshot at a point in time (for things like bootstrap). It also makes the internal logic a bit easier to follow, since it's easier to tell whether you're working with a point in time or with the whole recorded history. This hefty.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs35
-rw-r--r--src/channel/event.rs48
-rw-r--r--src/channel/history.rs42
-rw-r--r--src/channel/mod.rs15
-rw-r--r--src/channel/repo.rs94
-rw-r--r--src/channel/routes.rs11
-rw-r--r--src/channel/routes/test/on_create.rs6
-rw-r--r--src/channel/routes/test/on_send.rs13
-rw-r--r--src/channel/snapshot.rs38
9 files changed, 228 insertions, 74 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b7e3a10..6ce826b 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,11 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use crate::{
channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
};
pub struct Channels<'a> {
@@ -27,10 +28,11 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ for event in channel.events() {
+ self.events.broadcast(event);
+ }
- Ok(channel)
+ Ok(channel.snapshot())
}
pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
@@ -38,6 +40,16 @@ impl<'a> Channels<'a> {
let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
@@ -51,14 +63,21 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let event = tx.channels().delete(&channel, &deleted).await?;
- events.push(event);
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
+ for event in events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ {
+ self.events.broadcast(event);
}
Ok(())
diff --git a/src/channel/event.rs b/src/channel/event.rs
new file mode 100644
index 0000000..9c54174
--- /dev/null
+++ b/src/channel/event.rs
@@ -0,0 +1,48 @@
+use super::Channel;
+use crate::{
+ channel,
+ event::{Instant, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ Created(Created),
+ Deleted(Deleted),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Created {
+ pub channel: Channel,
+}
+
+impl From<Created> for Kind {
+ fn from(event: Created) -> Self {
+ Self::Created(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Deleted {
+ pub channel: channel::Id,
+}
+
+impl From<Deleted> for Kind {
+ fn from(event: Deleted) -> Self {
+ Self::Deleted(event)
+ }
+}
diff --git a/src/channel/history.rs b/src/channel/history.rs
new file mode 100644
index 0000000..3cc7d9d
--- /dev/null
+++ b/src/channel/history.rs
@@ -0,0 +1,42 @@
+use super::{
+ event::{Created, Deleted, Event},
+ Channel,
+};
+use crate::event::Instant;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub channel: Channel,
+ pub created: Instant,
+ pub deleted: Option<Instant>,
+}
+
+impl History {
+ fn created(&self) -> Event {
+ Event {
+ instant: self.created,
+ kind: Created {
+ channel: self.channel.clone(),
+ }
+ .into(),
+ }
+ }
+
+ fn deleted(&self) -> Option<Event> {
+ self.deleted.map(|instant| Event {
+ instant,
+ kind: Deleted {
+ channel: self.channel.id.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub fn events(&self) -> impl Iterator<Item = Event> {
+ [self.created()].into_iter().chain(self.deleted())
+ }
+
+ pub fn snapshot(&self) -> Channel {
+ self.channel.clone()
+ }
+}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index 4baa7e3..eb8200b 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,16 +1,9 @@
-use crate::event::Instant;
-
pub mod app;
+pub mod event;
+mod history;
mod id;
pub mod repo;
mod routes;
+mod snapshot;
-pub use self::{id::Id, routes::router};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Channel {
- pub id: Id,
- pub name: String,
- #[serde(skip)]
- pub created: Instant,
-}
+pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index c000b56..8bb761b 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -1,9 +1,9 @@
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
- channel::{Channel, Id},
+ channel::{Channel, History, Id},
clock::DateTime,
- event::{types, Instant, Sequence},
+ event::{Instant, Sequence},
};
pub trait Provider {
@@ -19,7 +19,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
impl<'c> Channels<'c> {
- pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> {
+ pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
let id = Id::generate();
let channel = sqlx::query!(
r#"
@@ -37,13 +37,16 @@ impl<'c> Channels<'c> {
created.at,
created.sequence,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_one(&mut *self.0)
.await?;
@@ -51,7 +54,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> {
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
let channel = sqlx::query!(
r#"
select
@@ -64,13 +67,16 @@ impl<'c> Channels<'c> {
"#,
channel,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_one(&mut *self.0)
.await?;
@@ -81,7 +87,7 @@ impl<'c> Channels<'c> {
pub async fn all(
&mut self,
resume_point: Option<Sequence>,
- ) -> Result<Vec<Channel>, sqlx::Error> {
+ ) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -95,13 +101,16 @@ impl<'c> Channels<'c> {
"#,
resume_point,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_all(&mut *self.0)
.await?;
@@ -112,7 +121,7 @@ impl<'c> Channels<'c> {
pub async fn replay(
&mut self,
resume_at: Option<Sequence>,
- ) -> Result<Vec<Channel>, sqlx::Error> {
+ ) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -125,13 +134,16 @@ impl<'c> Channels<'c> {
"#,
resume_at,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
created: Instant {
at: row.created_at,
sequence: row.created_sequence,
},
+ deleted: None,
})
.fetch_all(&mut *self.0)
.await?;
@@ -141,35 +153,43 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Channel,
+ channel: &Id,
deleted: &Instant,
- ) -> Result<types::ChannelEvent, sqlx::Error> {
- let channel = channel.id.clone();
- sqlx::query_scalar!(
+ ) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
r#"
delete from channel
where id = $1
- returning 1 as "row: i64"
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
"#,
channel,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: Some(*deleted),
+ })
.fetch_one(&mut *self.0)
.await?;
- Ok(types::ChannelEvent {
- instant: *deleted,
- data: types::DeletedEvent { channel }.into(),
- })
+ Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
- let channels = sqlx::query!(
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ let channels = sqlx::query_scalar!(
r#"
select
- channel.id as "id: Id",
- channel.name,
- channel.created_at as "created_at: DateTime",
- channel.created_sequence as "created_sequence: Sequence"
+ channel.id as "id: Id"
from channel
left join message
where created_at < $1
@@ -177,14 +197,6 @@ impl<'c> Channels<'c> {
"#,
expired_at,
)
- .map(|row| Channel {
- id: row.id,
- name: row.name,
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 5d8b61e..5bb1ee9 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -13,8 +13,9 @@ use crate::{
channel::{self, Channel},
clock::RequestedAt,
error::Internal,
- event::{app::EventsError, Sequence},
+ event::Sequence,
login::Login,
+ message::app::Error as MessageError,
};
#[cfg(test)]
@@ -99,8 +100,8 @@ async fn on_send(
login: Login,
Json(request): Json<SendRequest>,
) -> Result<StatusCode, ErrorResponse> {
- app.events()
- .send(&login, &channel, &request.message, &sent_at)
+ app.messages()
+ .send(&channel, &login, &sent_at, &request.message)
.await
// Could impl `From` here, but it's more code and this is used once.
.map_err(ErrorResponse)?;
@@ -109,13 +110,13 @@ async fn on_send(
}
#[derive(Debug)]
-struct ErrorResponse(EventsError);
+struct ErrorResponse(MessageError);
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ EventsError::ChannelNotFound(_) => {
+ not_found @ MessageError::ChannelNotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 9988932..5733c9e 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -3,7 +3,7 @@ use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
- event::types,
+ event,
test::fixtures::{self, future::Immediately as _},
};
@@ -50,8 +50,8 @@ async fn new_channel() {
.expect("creation event published");
assert!(matches!(
- event.data,
- types::ChannelEventData::Created(event)
+ event.kind,
+ event::Kind::ChannelCreated(event)
if event.channel == response_channel
));
}
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 33ec3b7..1027b29 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -4,7 +4,8 @@ use futures::stream::StreamExt;
use crate::{
channel,
channel::routes,
- event::{app, types},
+ event,
+ message::app,
test::fixtures::{self, future::Immediately as _},
};
@@ -54,10 +55,10 @@ async fn messages_in_order() {
for ((sent_at, message), event) in requests.into_iter().zip(events) {
assert_eq!(*sent_at, event.instant.at);
assert!(matches!(
- event.data,
- types::ChannelEventData::Message(event_message)
- if event_message.sender == sender
- && event_message.message.body == message
+ event.kind,
+ event::Kind::MessageSent(event)
+ if event.message.sender == sender
+ && event.message.body == message
));
}
}
@@ -90,6 +91,6 @@ async fn nonexistent_channel() {
assert!(matches!(
error,
- app::EventsError::ChannelNotFound(error_channel) if channel == error_channel
+ app::Error::ChannelNotFound(error_channel) if channel == error_channel
));
}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
new file mode 100644
index 0000000..6462f25
--- /dev/null
+++ b/src/channel/snapshot.rs
@@ -0,0 +1,38 @@
+use super::{
+ event::{Created, Event, Kind},
+ Id,
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Channel {
+ pub id: Id,
+ pub name: String,
+}
+
+impl Channel {
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event.kind) {
+ (None, Kind::Created(event)) => Some(event.into()),
+ (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None,
+ (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Channel> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Channel::apply)
+ }
+}
+
+impl From<&Created> for Channel {
+ fn from(event: &Created) -> Self {
+ event.channel.clone()
+ }
+}
+
+impl From<Created> for Channel {
+ fn from(event: Created) -> Self {
+ event.channel
+ }
+}