From 357116366c1307bedaac6a3dfe9c5ed8e0e0c210 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 00:41:25 -0400 Subject: First pass on reorganizing the backend. This is primarily renames and repackagings. --- src/event/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 src/event/mod.rs (limited to 'src/event/mod.rs') diff --git a/src/event/mod.rs b/src/event/mod.rs new file mode 100644 index 0000000..7ad3f9c --- /dev/null +++ b/src/event/mod.rs @@ -0,0 +1,9 @@ +pub mod app; +pub mod broadcaster; +mod extract; +pub mod repo; +mod routes; +mod sequence; +pub mod types; + +pub use self::{routes::router, sequence::Sequence}; -- cgit v1.2.3 From 469613872f6fb19f4579b387e19b2bc38fa52f51 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 01:31:43 -0400 Subject: Package up common event fields as Instant --- src/channel/app.rs | 11 +++--- src/channel/mod.rs | 6 ++-- src/channel/repo.rs | 74 ++++++++++++++++++++++++++------------ src/channel/routes/test/on_send.rs | 2 +- src/event/app.rs | 18 +++++----- src/event/mod.rs | 9 +++++ src/event/repo/message.rs | 42 +++++++++++++--------- src/event/repo/sequence.rs | 12 +++++-- src/event/routes.rs | 2 +- src/event/routes/test.rs | 8 ++--- src/event/types.rs | 22 +++--------- 11 files changed, 121 insertions(+), 85 deletions(-) (limited to 'src/event/mod.rs') diff --git a/src/channel/app.rs b/src/channel/app.rs index ef0a63f..b7e3a10 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -19,10 +19,10 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result { let mut tx = self.db.begin().await?; - let created_sequence = tx.sequence().next().await?; + let created = tx.sequence().next(created_at).await?; let channel = tx .channels() - .create(name, created_at, created_sequence) + .create(name, &created) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; @@ -50,11 +50,8 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { - let deleted_sequence = tx.sequence().next().await?; - let event = tx - .channels() - .delete(&channel, relative_to, deleted_sequence) - .await?; + let deleted = tx.sequence().next(relative_to).await?; + let event = tx.channels().delete(&channel, &deleted).await?; events.push(event); } diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 2672084..4baa7e3 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,4 @@ -use crate::{clock::DateTime, event::Sequence}; +use crate::event::Instant; pub mod app; mod id; @@ -12,7 +12,5 @@ pub struct Channel { pub id: Id, pub name: String, #[serde(skip)] - pub created_at: DateTime, - #[serde(skip)] - pub created_sequence: Sequence, + pub created: Instant, } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 18cd81f..c000b56 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, Id}, clock::DateTime, - event::{types, Sequence}, + event::{types, Instant, Sequence}, }; pub trait Provider { @@ -19,15 +19,9 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create( - &mut self, - name: &str, - created_at: &DateTime, - created_sequence: Sequence, - ) -> Result { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result { let id = Id::generate(); - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" insert into channel (id, name, created_at, created_sequence) @@ -40,9 +34,17 @@ impl<'c> Channels<'c> { "#, id, name, - created_at, - created_sequence, + created.at, + created.sequence, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -50,8 +52,7 @@ impl<'c> Channels<'c> { } pub async fn by_id(&mut self, channel: &Id) -> Result { - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" select id as "id: Id", @@ -63,6 +64,14 @@ impl<'c> Channels<'c> { "#, channel, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -73,8 +82,7 @@ impl<'c> Channels<'c> { &mut self, resume_point: Option, ) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -87,6 +95,14 @@ impl<'c> Channels<'c> { "#, resume_point, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -97,8 +113,7 @@ impl<'c> Channels<'c> { &mut self, resume_at: Option, ) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -110,6 +125,14 @@ impl<'c> Channels<'c> { "#, resume_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -119,8 +142,7 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, channel: &Channel, - deleted_at: &DateTime, - deleted_sequence: Sequence, + deleted: &Instant, ) -> Result { let channel = channel.id.clone(); sqlx::query_scalar!( @@ -135,15 +157,13 @@ impl<'c> Channels<'c> { .await?; Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, + instant: *deleted, data: types::DeletedEvent { channel }.into(), }) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select channel.id as "id: Id", @@ -157,6 +177,14 @@ impl<'c> Channels<'c> { "#, expired_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 6f844cd..33ec3b7 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -52,7 +52,7 @@ async fn messages_in_order() { let events = events.collect::>().immediately().await; for ((sent_at, message), event) in requests.into_iter().zip(events) { - assert_eq!(*sent_at, event.at); + assert_eq!(*sent_at, event.instant.at); assert!(matches!( event.data, types::ChannelEventData::Message(event_message) diff --git a/src/event/app.rs b/src/event/app.rs index 3d35f1a..5e9e79a 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -42,10 +42,10 @@ impl<'a> Events<'a> { .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent_sequence = tx.sequence().next().await?; + let sent = tx.sequence().next(sent_at).await?; let event = tx .message_events() - .create(login, &channel, sent_at, sent_sequence, body) + .create(login, &channel, &sent, body) .await?; tx.commit().await?; @@ -62,10 +62,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { - let deleted_sequence = tx.sequence().next().await?; + let deleted = tx.sequence().next(relative_to).await?; let event = tx .message_events() - .delete(&channel, &message, relative_to, deleted_sequence) + .delete(&channel, &message, &deleted) .await?; events.push(event); } @@ -93,7 +93,9 @@ impl<'a> Events<'a> { let channel_events = channels .into_iter() .map(ChannelEvent::created) - .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); + .filter(move |event| { + resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at) + }); let message_events = tx.message_events().replay(resume_at).await?; @@ -101,8 +103,8 @@ impl<'a> Events<'a> { .into_iter() .chain(message_events.into_iter()) .collect::>(); - replay_events.sort_by_key(|event| event.sequence); - let resume_live_at = replay_events.last().map(|event| event.sequence); + replay_events.sort_by_key(|event| Sequence::from(event)); + let resume_live_at = replay_events.last().map(Sequence::from); let replay = stream::iter(replay_events); @@ -124,7 +126,7 @@ impl<'a> Events<'a> { fn resume( resume_at: Option, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { - move |event| future::ready(resume_at < Some(event.sequence)) + move |event| future::ready(resume_at < Some(Sequence::from(event))) } } diff --git a/src/event/mod.rs b/src/event/mod.rs index 7ad3f9c..c982d3a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -6,4 +6,13 @@ mod routes; mod sequence; pub mod types; +use crate::clock::DateTime; + pub use self::{routes::router, sequence::Sequence}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Instant { + pub at: DateTime, + #[serde(skip)] + pub sequence: Sequence, +} diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs index f051fec..f29c8a4 100644 --- a/src/event/repo/message.rs +++ b/src/event/repo/message.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{self, Channel}, clock::DateTime, - event::{types, Sequence}, + event::{types, Instant, Sequence}, login::{self, Login}, message::{self, Message}, }; @@ -25,8 +25,7 @@ impl<'c> Events<'c> { &mut self, sender: &Login, channel: &Channel, - sent_at: &DateTime, - sent_sequence: Sequence, + sent: &Instant, body: &str, ) -> Result { let id = message::Id::generate(); @@ -46,13 +45,15 @@ impl<'c> Events<'c> { id, channel.id, sender.id, - sent_at, - sent_sequence, + sent.at, + sent.sequence, body, ) .map(|row| types::ChannelEvent { - sequence: row.sent_sequence, - at: row.sent_at, + instant: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, data: types::MessageEvent { channel: channel.clone(), sender: sender.clone(), @@ -73,8 +74,7 @@ impl<'c> Events<'c> { &mut self, channel: &Channel, message: &message::Id, - deleted_at: &DateTime, - deleted_sequence: Sequence, + deleted: &Instant, ) -> Result { sqlx::query_scalar!( r#" @@ -88,8 +88,10 @@ impl<'c> Events<'c> { .await?; Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, + instant: Instant { + at: deleted.at, + sequence: deleted.sequence, + }, data: types::MessageDeletedEvent { channel: channel.clone(), message: message.clone(), @@ -122,8 +124,10 @@ impl<'c> Events<'c> { Channel { id: row.channel_id, name: row.channel_name, - created_at: row.channel_created_at, - created_sequence: row.channel_created_sequence, + created: Instant { + at: row.channel_created_at, + sequence: row.channel_created_sequence, + }, }, row.message, ) @@ -160,14 +164,18 @@ impl<'c> Events<'c> { resume_at, ) .map(|row| types::ChannelEvent { - sequence: row.sent_sequence, - at: row.sent_at, + instant: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, data: types::MessageEvent { channel: Channel { id: row.channel_id, name: row.channel_name, - created_at: row.channel_created_at, - created_sequence: row.channel_created_sequence, + created: Instant { + at: row.channel_created_at, + sequence: row.channel_created_sequence, + }, }, sender: Login { id: row.sender_id, diff --git a/src/event/repo/sequence.rs b/src/event/repo/sequence.rs index c985869..40d6a53 100644 --- a/src/event/repo/sequence.rs +++ b/src/event/repo/sequence.rs @@ -1,6 +1,9 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::event::Sequence; +use crate::{ + clock::DateTime, + event::{Instant, Sequence}, +}; pub trait Provider { fn sequence(&mut self) -> Sequences; @@ -15,7 +18,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Sequences<'t>(&'t mut SqliteConnection); impl<'c> Sequences<'c> { - pub async fn next(&mut self) -> Result { + pub async fn next(&mut self, at: &DateTime) -> Result { let next = sqlx::query_scalar!( r#" update event_sequence @@ -26,7 +29,10 @@ impl<'c> Sequences<'c> { .fetch_one(&mut *self.0) .await?; - Ok(next) + Ok(Instant { + at: *at, + sequence: next, + }) } pub async fn current(&mut self) -> Result { diff --git a/src/event/routes.rs b/src/event/routes.rs index 50ac435..c87bfb2 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -66,7 +66,7 @@ impl TryFrom for sse::Event { type Error = serde_json::Error; fn try_from(event: types::ChannelEvent) -> Result { - let id = serde_json::to_string(&event.sequence)?; + let id = serde_json::to_string(&Sequence::from(&event))?; let data = serde_json::to_string_pretty(&event)?; let event = Self::default().id(id).data(data); diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index d1ac3b4..68b55cc 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -6,7 +6,7 @@ use futures::{ }; use crate::{ - event::routes, + event::{routes, Sequence}, test::fixtures::{self, future::Immediately as _}, }; @@ -192,7 +192,7 @@ async fn resumes_from() { assert_eq!(initial_message, event); - event.sequence + Sequence::from(&event) }; // Resume after disconnect @@ -276,7 +276,7 @@ async fn serial_resume() { let event = events.last().expect("this vec is non-empty"); - event.sequence + Sequence::from(event) }; // Resume after disconnect @@ -312,7 +312,7 @@ async fn serial_resume() { let event = events.last().expect("this vec is non-empty"); - event.sequence + Sequence::from(event) }; // Resume after disconnect a second time diff --git a/src/event/types.rs b/src/event/types.rs index cd7dea6..2324dc1 100644 --- a/src/event/types.rs +++ b/src/event/types.rs @@ -1,16 +1,14 @@ use crate::{ channel::{self, Channel}, - clock::DateTime, - event::Sequence, + event::{Instant, Sequence}, login::Login, message::{self, Message}, }; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct ChannelEvent { - #[serde(skip)] - pub sequence: Sequence, - pub at: DateTime, + #[serde(flatten)] + pub instant: Instant, #[serde(flatten)] pub data: ChannelEventData, } @@ -18,25 +16,15 @@ pub struct ChannelEvent { impl ChannelEvent { pub fn created(channel: Channel) -> Self { Self { - at: channel.created_at, - sequence: channel.created_sequence, + instant: channel.created, data: CreatedEvent { channel }.into(), } } - - pub fn channel_id(&self) -> &channel::Id { - match &self.data { - ChannelEventData::Created(event) => &event.channel.id, - ChannelEventData::Message(event) => &event.channel.id, - ChannelEventData::MessageDeleted(event) => &event.channel.id, - ChannelEventData::Deleted(event) => &event.channel, - } - } } impl<'c> From<&'c ChannelEvent> for Sequence { fn from(event: &'c ChannelEvent) -> Self { - event.sequence + event.instant.sequence } } -- cgit v1.2.3 From ec804134c33aedb001c426c5f42f43f53c47848f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 12:25:36 -0400 Subject: Represent channels and messages using a split "History" and "Snapshot" model. This separates the code that figures out what happened to an entity from the code that represents it to a user, and makes it easier to compute a snapshot at a point in time (for things like bootstrap). It also makes the internal logic a bit easier to follow, since it's easier to tell whether you're working with a point in time or with the whole recorded history. This hefty. --- ...86018161718e2a6788413bffffb252de3e1959f341.json | 38 ++++ ...4abfbb9e06a5889460743202ca7956acabf006843e.json | 20 ++ ...2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json | 26 +++ ...05a144db4901c302bbcd3e76da6c61742ac44345c9.json | 44 ----- ...0b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json | 32 +++ ...6854398e2151ba2dba10c03a9d2d93184141f1425c.json | 44 ----- ...4eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json | 62 ++++++ ...b8df33083c2da765dfda3023c78c25c06735670457.json | 38 ---- ...72b18b7af92ac1f1faed9df41df90f57cb596cfe7c.json | 74 ------- ...df5de8c7d9ad11bb09a0d0d243181d97c79d771071.json | 20 ++ ...6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json | 20 -- ...99e837106c799e84015425286b79f42e4001d8a4c7.json | 62 ++++++ ...76cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json | 20 -- Cargo.lock | 10 + Cargo.toml | 1 + src/app.rs | 5 + src/broadcast.rs | 4 +- src/channel/app.rs | 35 +++- src/channel/event.rs | 48 +++++ src/channel/history.rs | 42 ++++ src/channel/mod.rs | 15 +- src/channel/repo.rs | 94 +++++---- src/channel/routes.rs | 11 +- src/channel/routes/test/on_create.rs | 6 +- src/channel/routes/test/on_send.rs | 13 +- src/channel/snapshot.rs | 38 ++++ src/event/app.rs | 122 +++--------- src/event/broadcaster.rs | 4 +- src/event/mod.rs | 73 ++++++- src/event/repo.rs | 50 +++++ src/event/repo/message.rs | 196 ------------------- src/event/repo/mod.rs | 4 - src/event/repo/sequence.rs | 50 ----- src/event/routes.rs | 14 +- src/event/routes/test.rs | 98 ++++++---- src/event/sequence.rs | 59 ++++++ src/event/types.rs | 85 -------- src/expire.rs | 2 +- src/message/app.rs | 88 +++++++++ src/message/event.rs | 50 +++++ src/message/history.rs | 43 +++++ src/message/mod.rs | 13 +- src/message/repo.rs | 214 +++++++++++++++++++++ src/message/snapshot.rs | 74 +++++++ src/test/fixtures/event.rs | 11 ++ src/test/fixtures/filter.rs | 10 +- src/test/fixtures/message.rs | 13 +- src/test/fixtures/mod.rs | 1 + src/token/app.rs | 4 +- 49 files changed, 1277 insertions(+), 823 deletions(-) create mode 100644 .sqlx/query-1654b05159c27f74cb333586018161718e2a6788413bffffb252de3e1959f341.json create mode 100644 .sqlx/query-33f9a143409e6f436ed6b64abfbb9e06a5889460743202ca7956acabf006843e.json create mode 100644 .sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json delete mode 100644 .sqlx/query-4715007e2395ad30433b7405a144db4901c302bbcd3e76da6c61742ac44345c9.json create mode 100644 .sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json delete mode 100644 .sqlx/query-5244f04bc270fc8d3cd4116854398e2151ba2dba10c03a9d2d93184141f1425c.json create mode 100644 .sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json delete mode 100644 .sqlx/query-74f0bad30dcec743d77309b8df33083c2da765dfda3023c78c25c06735670457.json delete mode 100644 .sqlx/query-7e816ede017bc2635c11ab72b18b7af92ac1f1faed9df41df90f57cb596cfe7c.json create mode 100644 .sqlx/query-b7e05e2b2eb5484c954bcedf5de8c7d9ad11bb09a0d0d243181d97c79d771071.json delete mode 100644 .sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json create mode 100644 .sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json delete mode 100644 .sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json create mode 100644 src/channel/event.rs create mode 100644 src/channel/history.rs create mode 100644 src/channel/snapshot.rs create mode 100644 src/event/repo.rs delete mode 100644 src/event/repo/message.rs delete mode 100644 src/event/repo/mod.rs delete mode 100644 src/event/repo/sequence.rs delete mode 100644 src/event/types.rs create mode 100644 src/message/app.rs create mode 100644 src/message/event.rs create mode 100644 src/message/history.rs create mode 100644 src/message/repo.rs create mode 100644 src/message/snapshot.rs create mode 100644 src/test/fixtures/event.rs (limited to 'src/event/mod.rs') diff --git a/.sqlx/query-1654b05159c27f74cb333586018161718e2a6788413bffffb252de3e1959f341.json b/.sqlx/query-1654b05159c27f74cb333586018161718e2a6788413bffffb252de3e1959f341.json new file mode 100644 index 0000000..cc716ed --- /dev/null +++ b/.sqlx/query-1654b05159c27f74cb333586018161718e2a6788413bffffb252de3e1959f341.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n delete from channel\n where id = $1\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\",\n created_sequence as \"created_sequence: Sequence\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "created_at: DateTime", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "1654b05159c27f74cb333586018161718e2a6788413bffffb252de3e1959f341" +} diff --git a/.sqlx/query-33f9a143409e6f436ed6b64abfbb9e06a5889460743202ca7956acabf006843e.json b/.sqlx/query-33f9a143409e6f436ed6b64abfbb9e06a5889460743202ca7956acabf006843e.json new file mode 100644 index 0000000..1480953 --- /dev/null +++ b/.sqlx/query-33f9a143409e6f436ed6b64abfbb9e06a5889460743202ca7956acabf006843e.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete from message\n where\n id = $1\n returning 1 as \"deleted: i64\"\n ", + "describe": { + "columns": [ + { + "name": "deleted: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + null + ] + }, + "hash": "33f9a143409e6f436ed6b64abfbb9e06a5889460743202ca7956acabf006843e" +} diff --git a/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json b/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json new file mode 100644 index 0000000..2974cb0 --- /dev/null +++ b/.sqlx/query-45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, channel, sender, sent_at, sent_sequence, body)\n\t\t\t\tvalues ($1, $2, $3, $4, $5, $6)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: Id\",\n\t\t\t\t\tbody\n\t\t\t", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 1, + "type_info": "Text" + } + ], + "parameters": { + "Right": 6 + }, + "nullable": [ + false, + false + ] + }, + "hash": "45449846ea98e892c6e58f2ba8e082c21c9e3d124e5340dec036edd341d94e0f" +} diff --git a/.sqlx/query-4715007e2395ad30433b7405a144db4901c302bbcd3e76da6c61742ac44345c9.json b/.sqlx/query-4715007e2395ad30433b7405a144db4901c302bbcd3e76da6c61742ac44345c9.json deleted file mode 100644 index 494e1db..0000000 --- a/.sqlx/query-4715007e2395ad30433b7405a144db4901c302bbcd3e76da6c61742ac44345c9.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, channel, sender, sent_at, sent_sequence, body)\n\t\t\t\tvalues ($1, $2, $3, $4, $5, $6)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: message::Id\",\n\t\t\t\t\tsender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n\t\t\t\t\tbody\n\t\t\t", - "describe": { - "columns": [ - { - "name": "id: message::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "body", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 6 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "4715007e2395ad30433b7405a144db4901c302bbcd3e76da6c61742ac44345c9" -} diff --git a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json b/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json new file mode 100644 index 0000000..fb5f94b --- /dev/null +++ b/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n message.id as \"message: Id\"\n from message\n join channel on message.channel = channel.id\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "message: Id", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc" +} diff --git a/.sqlx/query-5244f04bc270fc8d3cd4116854398e2151ba2dba10c03a9d2d93184141f1425c.json b/.sqlx/query-5244f04bc270fc8d3cd4116854398e2151ba2dba10c03a9d2d93184141f1425c.json deleted file mode 100644 index 820b43f..0000000 --- a/.sqlx/query-5244f04bc270fc8d3cd4116854398e2151ba2dba10c03a9d2d93184141f1425c.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n channel.created_at as \"channel_created_at: DateTime\",\n channel.created_sequence as \"channel_created_sequence: Sequence\",\n message.id as \"message: message::Id\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where sent_at < $1\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "channel_created_at: DateTime", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "channel_created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "message: message::Id", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "5244f04bc270fc8d3cd4116854398e2151ba2dba10c03a9d2d93184141f1425c" -} diff --git a/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json b/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json new file mode 100644 index 0000000..4ca6786 --- /dev/null +++ b/.sqlx/query-5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where coalesce(message.sent_sequence > $1, true)\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "5c53579fa431b6e184faf94eedb8229360ba78f2607d25e7e2ee5db5c759a5a3" +} diff --git a/.sqlx/query-74f0bad30dcec743d77309b8df33083c2da765dfda3023c78c25c06735670457.json b/.sqlx/query-74f0bad30dcec743d77309b8df33083c2da765dfda3023c78c25c06735670457.json deleted file mode 100644 index b34443f..0000000 --- a/.sqlx/query-74f0bad30dcec743d77309b8df33083c2da765dfda3023c78c25c06735670457.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"id: Id\",\n channel.name,\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\"\n from channel\n left join message\n where created_at < $1\n and message.id is null\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "created_at: DateTime", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false - ] - }, - "hash": "74f0bad30dcec743d77309b8df33083c2da765dfda3023c78c25c06735670457" -} diff --git a/.sqlx/query-7e816ede017bc2635c11ab72b18b7af92ac1f1faed9df41df90f57cb596cfe7c.json b/.sqlx/query-7e816ede017bc2635c11ab72b18b7af92ac1f1faed9df41df90f57cb596cfe7c.json deleted file mode 100644 index f546438..0000000 --- a/.sqlx/query-7e816ede017bc2635c11ab72b18b7af92ac1f1faed9df41df90f57cb596cfe7c.json +++ /dev/null @@ -1,74 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n\t\t\t\tselect\n\t\t\t\t\tmessage.id as \"id: message::Id\",\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n channel.created_at as \"channel_created_at: DateTime\",\n channel.created_sequence as \"channel_created_sequence: Sequence\",\n\t\t\t\t\tsender.id as \"sender_id: login::Id\",\n\t\t\t\t\tsender.name as sender_name,\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body\n\t\t\t\tfrom message\n join channel on message.channel = channel.id\n\t\t\t\t\tjoin login as sender on message.sender = sender.id\n\t\t\t\twhere coalesce(message.sent_sequence > $1, true)\n\t\t\t\torder by sent_sequence asc\n\t\t\t", - "describe": { - "columns": [ - { - "name": "id: message::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_id: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "channel_created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "channel_created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "sender_id: login::Id", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 7, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 8, - "type_info": "Integer" - }, - { - "name": "body", - "ordinal": 9, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "7e816ede017bc2635c11ab72b18b7af92ac1f1faed9df41df90f57cb596cfe7c" -} diff --git a/.sqlx/query-b7e05e2b2eb5484c954bcedf5de8c7d9ad11bb09a0d0d243181d97c79d771071.json b/.sqlx/query-b7e05e2b2eb5484c954bcedf5de8c7d9ad11bb09a0d0d243181d97c79d771071.json new file mode 100644 index 0000000..b82727f --- /dev/null +++ b/.sqlx/query-b7e05e2b2eb5484c954bcedf5de8c7d9ad11bb09a0d0d243181d97c79d771071.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"id: Id\"\n from channel\n left join message\n where created_at < $1\n and message.id is null\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "b7e05e2b2eb5484c954bcedf5de8c7d9ad11bb09a0d0d243181d97c79d771071" +} diff --git a/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json b/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json deleted file mode 100644 index 1d448d4..0000000 --- a/.sqlx/query-d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n delete from channel\n where id = $1\n returning 1 as \"row: i64\"\n ", - "describe": { - "columns": [ - { - "name": "row: i64", - "ordinal": 0, - "type_info": "Null" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - null - ] - }, - "hash": "d382215ac9e9d8d2c9b5eb6f1c2744bdb7a93c39ebcf15087c89bba6be71f7cb" -} diff --git a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json b/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json new file mode 100644 index 0000000..288a657 --- /dev/null +++ b/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n and message.channel = $2\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7" +} diff --git a/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json deleted file mode 100644 index 7b1d2d8..0000000 --- a/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n delete from message\n where id = $1\n returning 1 as \"row: i64\"\n ", - "describe": { - "columns": [ - { - "name": "row: i64", - "ordinal": 0, - "type_info": "Null" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - null - ] - }, - "hash": "f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9" -} diff --git a/Cargo.lock b/Cargo.lock index aa2120a..b1f0582 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -787,6 +787,7 @@ dependencies = [ "faker_rand", "futures", "headers", + "itertools", "password-hash", "rand", "rand_core", @@ -956,6 +957,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" diff --git a/Cargo.toml b/Cargo.toml index cb46b41..2b2e774 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.18", features = ["derive", "env"] } futures = "0.3.30" headers = "0.4.0" +itertools = "0.13.0" password-hash = { version = "0.5.0", features = ["std"] } rand = "0.8.5" rand_core = { version = "0.6.4", features = ["getrandom"] } diff --git a/src/app.rs b/src/app.rs index 5542e5f..186e5f8 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,6 +4,7 @@ use crate::{ channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, login::app::Logins, + message::app::Messages, token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; @@ -35,6 +36,10 @@ impl App { Logins::new(&self.db) } + pub const fn messages(&self) -> Messages { + Messages::new(&self.db, &self.events) + } + pub const fn tokens(&self) -> Tokens { Tokens::new(&self.db, &self.tokens) } diff --git a/src/broadcast.rs b/src/broadcast.rs index 083a301..bedc263 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -32,7 +32,7 @@ where { // panic: if ``message.channel.id`` has not been previously registered, // and was not part of the initial set of channels. - pub fn broadcast(&self, message: &M) { + pub fn broadcast(&self, message: impl Into) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -42,7 +42,7 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.clone()); + let _ = tx.send(message.into()); } // panic: if ``channel`` has not been previously registered, and was not diff --git a/src/channel/app.rs b/src/channel/app.rs index b7e3a10..6ce826b 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,10 +1,11 @@ use chrono::TimeDelta; +use itertools::Itertools; use sqlx::sqlite::SqlitePool; use crate::{ channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, }; pub struct Channels<'a> { @@ -27,10 +28,11 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - self.events - .broadcast(&ChannelEvent::created(channel.clone())); + for event in channel.events() { + self.events.broadcast(event); + } - Ok(channel) + Ok(channel.snapshot()) } pub async fn all(&self, resume_point: Option) -> Result, InternalError> { @@ -38,6 +40,16 @@ impl<'a> Channels<'a> { let channels = tx.channels().all(resume_point).await?; tx.commit().await?; + let channels = channels + .into_iter() + .filter_map(|channel| { + channel + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + Ok(channels) } @@ -51,14 +63,21 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { let deleted = tx.sequence().next(relative_to).await?; - let event = tx.channels().delete(&channel, &deleted).await?; - events.push(event); + let channel = tx.channels().delete(&channel, &deleted).await?; + events.push( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); } tx.commit().await?; - for event in events { - self.events.broadcast(&event); + for event in events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + { + self.events.broadcast(event); } Ok(()) diff --git a/src/channel/event.rs b/src/channel/event.rs new file mode 100644 index 0000000..9c54174 --- /dev/null +++ b/src/channel/event.rs @@ -0,0 +1,48 @@ +use super::Channel; +use crate::{ + channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Created(Created), + Deleted(Deleted), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + pub channel: Channel, +} + +impl From for Kind { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + pub channel: channel::Id, +} + +impl From for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/channel/history.rs b/src/channel/history.rs new file mode 100644 index 0000000..3cc7d9d --- /dev/null +++ b/src/channel/history.rs @@ -0,0 +1,42 @@ +use super::{ + event::{Created, Deleted, Event}, + Channel, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub channel: Channel, + pub created: Instant, + pub deleted: Option, +} + +impl History { + fn created(&self) -> Event { + Event { + instant: self.created, + kind: Created { + channel: self.channel.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option { + self.deleted.map(|instant| Event { + instant, + kind: Deleted { + channel: self.channel.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator { + [self.created()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Channel { + self.channel.clone() + } +} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 4baa7e3..eb8200b 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,16 +1,9 @@ -use crate::event::Instant; - pub mod app; +pub mod event; +mod history; mod id; pub mod repo; mod routes; +mod snapshot; -pub use self::{id::Id, routes::router}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Channel { - pub id: Id, - pub name: String, - #[serde(skip)] - pub created: Instant, -} +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs index c000b56..8bb761b 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -1,9 +1,9 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ - channel::{Channel, Id}, + channel::{Channel, History, Id}, clock::DateTime, - event::{types, Instant, Sequence}, + event::{Instant, Sequence}, }; pub trait Provider { @@ -19,7 +19,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str, created: &Instant) -> Result { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result { let id = Id::generate(); let channel = sqlx::query!( r#" @@ -37,13 +37,16 @@ impl<'c> Channels<'c> { created.at, created.sequence, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_one(&mut *self.0) .await?; @@ -51,7 +54,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn by_id(&mut self, channel: &Id) -> Result { + pub async fn by_id(&mut self, channel: &Id) -> Result { let channel = sqlx::query!( r#" select @@ -64,13 +67,16 @@ impl<'c> Channels<'c> { "#, channel, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_one(&mut *self.0) .await?; @@ -81,7 +87,7 @@ impl<'c> Channels<'c> { pub async fn all( &mut self, resume_point: Option, - ) -> Result, sqlx::Error> { + ) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -95,13 +101,16 @@ impl<'c> Channels<'c> { "#, resume_point, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_all(&mut *self.0) .await?; @@ -112,7 +121,7 @@ impl<'c> Channels<'c> { pub async fn replay( &mut self, resume_at: Option, - ) -> Result, sqlx::Error> { + ) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -125,13 +134,16 @@ impl<'c> Channels<'c> { "#, resume_at, ) - .map(|row| Channel { - id: row.id, - name: row.name, + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, created: Instant { at: row.created_at, sequence: row.created_sequence, }, + deleted: None, }) .fetch_all(&mut *self.0) .await?; @@ -141,35 +153,43 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Channel, + channel: &Id, deleted: &Instant, - ) -> Result { - let channel = channel.id.clone(); - sqlx::query_scalar!( + ) -> Result { + let channel = sqlx::query!( r#" delete from channel where id = $1 - returning 1 as "row: i64" + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" "#, channel, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: Some(*deleted), + }) .fetch_one(&mut *self.0) .await?; - Ok(types::ChannelEvent { - instant: *deleted, - data: types::DeletedEvent { channel }.into(), - }) + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { - let channels = sqlx::query!( + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_scalar!( r#" select - channel.id as "id: Id", - channel.name, - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence" + channel.id as "id: Id" from channel left join message where created_at < $1 @@ -177,14 +197,6 @@ impl<'c> Channels<'c> { "#, expired_at, ) - .map(|row| Channel { - id: row.id, - name: row.name, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5d8b61e..5bb1ee9 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -13,8 +13,9 @@ use crate::{ channel::{self, Channel}, clock::RequestedAt, error::Internal, - event::{app::EventsError, Sequence}, + event::Sequence, login::Login, + message::app::Error as MessageError, }; #[cfg(test)] @@ -99,8 +100,8 @@ async fn on_send( login: Login, Json(request): Json, ) -> Result { - app.events() - .send(&login, &channel, &request.message, &sent_at) + app.messages() + .send(&channel, &login, &sent_at, &request.message) .await // Could impl `From` here, but it's more code and this is used once. .map_err(ErrorResponse)?; @@ -109,13 +110,13 @@ async fn on_send( } #[derive(Debug)] -struct ErrorResponse(EventsError); +struct ErrorResponse(MessageError); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ EventsError::ChannelNotFound(_) => { + not_found @ MessageError::ChannelNotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 9988932..5733c9e 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -3,7 +3,7 @@ use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, - event::types, + event, test::fixtures::{self, future::Immediately as _}, }; @@ -50,8 +50,8 @@ async fn new_channel() { .expect("creation event published"); assert!(matches!( - event.data, - types::ChannelEventData::Created(event) + event.kind, + event::Kind::ChannelCreated(event) if event.channel == response_channel )); } diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 33ec3b7..1027b29 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -4,7 +4,8 @@ use futures::stream::StreamExt; use crate::{ channel, channel::routes, - event::{app, types}, + event, + message::app, test::fixtures::{self, future::Immediately as _}, }; @@ -54,10 +55,10 @@ async fn messages_in_order() { for ((sent_at, message), event) in requests.into_iter().zip(events) { assert_eq!(*sent_at, event.instant.at); assert!(matches!( - event.data, - types::ChannelEventData::Message(event_message) - if event_message.sender == sender - && event_message.message.body == message + event.kind, + event::Kind::MessageSent(event) + if event.message.sender == sender + && event.message.body == message )); } } @@ -90,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::EventsError::ChannelNotFound(error_channel) if channel == error_channel + app::Error::ChannelNotFound(error_channel) if channel == error_channel )); } diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs new file mode 100644 index 0000000..6462f25 --- /dev/null +++ b/src/channel/snapshot.rs @@ -0,0 +1,38 @@ +use super::{ + event::{Created, Event, Kind}, + Id, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Channel { + pub id: Id, + pub name: String, +} + +impl Channel { + fn apply(state: Option, event: Event) -> Option { + match (state, event.kind) { + (None, Kind::Created(event)) => Some(event.into()), + (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None, + (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator for Option { + fn from_iter>(events: I) -> Self { + events.into_iter().fold(None, Channel::apply) + } +} + +impl From<&Created> for Channel { + fn from(event: &Created) -> Self { + event.channel.clone() + } +} + +impl From for Channel { + fn from(event: Created) -> Self { + event.channel + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 5e9e79a..e58bea9 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -1,22 +1,15 @@ -use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, Stream, }; +use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{ - broadcaster::Broadcaster, - repo::message::Provider as _, - types::{self, ChannelEvent}, -}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, - clock::DateTime, - db::NotFound as _, - event::{repo::Provider as _, Sequence}, - login::Login, + message::{self, repo::Provider as _}, }; pub struct Events<'a> { @@ -29,111 +22,52 @@ impl<'a> Events<'a> { Self { db, events } } - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent = tx.sequence().next(sent_at).await?; - let event = tx - .message_events() - .create(login, &channel, &sent, body) - .await?; - tx.commit().await?; - - self.events.broadcast(&event); - Ok(event) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let expired = tx.message_events().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { - let deleted = tx.sequence().next(relative_to).await?; - let event = tx - .message_events() - .delete(&channel, &message, &deleted) - .await?; - events.push(event); - } - - tx.commit().await?; - - for event in events { - self.events.broadcast(&event); - } - - Ok(()) - } - pub async fn subscribe( &self, resume_at: Option, - ) -> Result + std::fmt::Debug, sqlx::Error> { + ) -> Result + std::fmt::Debug, sqlx::Error> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); let mut tx = self.db.begin().await?; - let channels = tx.channels().replay(resume_at).await?; + let channels = tx.channels().replay(resume_at).await?; let channel_events = channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| { - resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at) - }); - - let message_events = tx.message_events().replay(resume_at).await?; - - let mut replay_events = channel_events - .into_iter() - .chain(message_events.into_iter()) + .iter() + .map(channel::History::events) + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let messages = tx.messages().replay(resume_at).await?; + let message_events = messages + .iter() + .map(message::History::events) + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .filter(Sequence::after(resume_at)) + .map(Event::from); + + let replay_events = channel_events + .merge_by(message_events, |a, b| { + a.instant.sequence < b.instant.sequence + }) .collect::>(); - replay_events.sort_by_key(|event| Sequence::from(event)); - let resume_live_at = replay_events.last().map(Sequence::from); + let resume_live_at = replay_events.last().map(Sequenced::sequence); let replay = stream::iter(replay_events); - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. let live_messages = live_messages // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from - // stored_messages. + // `replay_events`. .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) } - fn resume( - resume_at: Option, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready { - move |event| future::ready(resume_at < Some(Sequence::from(event))) + fn resume(resume_at: Option) -> impl for<'m> FnMut(&'m Event) -> future::Ready { + let filter = Sequence::after(resume_at); + move |event| future::ready(filter(event)) } } - -#[derive(Debug, thiserror::Error)] -pub enum EventsError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index 92f631f..de2513a 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ -use crate::{broadcast, event::types}; +use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster; +pub type Broadcaster = broadcast::Broadcaster; diff --git a/src/event/mod.rs b/src/event/mod.rs index c982d3a..1503b77 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,18 +1,75 @@ +use crate::{channel, message}; + pub mod app; pub mod broadcaster; mod extract; pub mod repo; mod routes; mod sequence; -pub mod types; -use crate::clock::DateTime; +pub use self::{ + routes::router, + sequence::{Instant, Sequence, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +impl From for Event { + fn from(event: channel::Event) -> Self { + Self { + instant: event.instant, + kind: event.kind.into(), + } + } +} + +impl From for Event { + fn from(event: message::Event) -> Self { + Self { + instant: event.instant, + kind: event.kind.into(), + } + } +} -pub use self::{routes::router, sequence::Sequence}; +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + #[serde(rename = "created")] + ChannelCreated(channel::event::Created), + #[serde(rename = "message")] + MessageSent(message::event::Sent), + MessageDeleted(message::event::Deleted), + #[serde(rename = "deleted")] + ChannelDeleted(channel::event::Deleted), +} + +impl From for Kind { + fn from(kind: channel::event::Kind) -> Self { + match kind { + channel::event::Kind::Created(created) => Self::ChannelCreated(created), + channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted), + } + } +} -#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Instant { - pub at: DateTime, - #[serde(skip)] - pub sequence: Sequence, +impl From for Kind { + fn from(kind: message::event::Kind) -> Self { + match kind { + message::event::Kind::Sent(created) => Self::MessageSent(created), + message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted), + } + } } diff --git a/src/event/repo.rs b/src/event/repo.rs new file mode 100644 index 0000000..40d6a53 --- /dev/null +++ b/src/event/repo.rs @@ -0,0 +1,50 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + event::{Instant, Sequence}, +}; + +pub trait Provider { + fn sequence(&mut self) -> Sequences; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn sequence(&mut self) -> Sequences { + Sequences(self) + } +} + +pub struct Sequences<'t>(&'t mut SqliteConnection); + +impl<'c> Sequences<'c> { + pub async fn next(&mut self, at: &DateTime) -> Result { + let next = sqlx::query_scalar!( + r#" + update event_sequence + set last_value = last_value + 1 + returning last_value as "next_value: Sequence" + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(Instant { + at: *at, + sequence: next, + }) + } + + pub async fn current(&mut self) -> Result { + let next = sqlx::query_scalar!( + r#" + select last_value as "last_value: Sequence" + from event_sequence + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(next) + } +} diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs deleted file mode 100644 index f29c8a4..0000000 --- a/src/event/repo/message.rs +++ /dev/null @@ -1,196 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - channel::{self, Channel}, - clock::DateTime, - event::{types, Instant, Sequence}, - login::{self, Login}, - message::{self, Message}, -}; - -pub trait Provider { - fn message_events(&mut self) -> Events; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn message_events(&mut self) -> Events { - Events(self) - } -} - -pub struct Events<'t>(&'t mut SqliteConnection); - -impl<'c> Events<'c> { - pub async fn create( - &mut self, - sender: &Login, - channel: &Channel, - sent: &Instant, - body: &str, - ) -> Result { - let id = message::Id::generate(); - - let message = sqlx::query!( - r#" - insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: message::Id", - sender as "sender: login::Id", - sent_at as "sent_at: DateTime", - sent_sequence as "sent_sequence: Sequence", - body - "#, - id, - channel.id, - sender.id, - sent.at, - sent.sequence, - body, - ) - .map(|row| types::ChannelEvent { - instant: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, - data: types::MessageEvent { - channel: channel.clone(), - sender: sender.clone(), - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn delete( - &mut self, - channel: &Channel, - message: &message::Id, - deleted: &Instant, - ) -> Result { - sqlx::query_scalar!( - r#" - delete from message - where id = $1 - returning 1 as "row: i64" - "#, - message, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - instant: Instant { - at: deleted.at, - sequence: deleted.sequence, - }, - data: types::MessageDeletedEvent { - channel: channel.clone(), - message: message.clone(), - } - .into(), - }) - } - - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - channel.created_sequence as "channel_created_sequence: Sequence", - message.id as "message: message::Id" - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where sent_at < $1 - "#, - expire_at, - ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - created: Instant { - at: row.channel_created_at, - sequence: row.channel_created_sequence, - }, - }, - row.message, - ) - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } - - pub async fn replay( - &mut self, - resume_at: Option, - ) -> Result, sqlx::Error> { - let events = sqlx::query!( - r#" - select - message.id as "id: message::Id", - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - channel.created_sequence as "channel_created_sequence: Sequence", - sender.id as "sender_id: login::Id", - sender.name as sender_name, - message.sent_at as "sent_at: DateTime", - message.sent_sequence as "sent_sequence: Sequence", - message.body - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where coalesce(message.sent_sequence > $1, true) - order by sent_sequence asc - "#, - resume_at, - ) - .map(|row| types::ChannelEvent { - instant: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, - data: types::MessageEvent { - channel: Channel { - id: row.channel_id, - name: row.channel_name, - created: Instant { - at: row.channel_created_at, - sequence: row.channel_created_sequence, - }, - }, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(events) - } -} diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs deleted file mode 100644 index cee840c..0000000 --- a/src/event/repo/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod message; -mod sequence; - -pub use self::sequence::Provider; diff --git a/src/event/repo/sequence.rs b/src/event/repo/sequence.rs deleted file mode 100644 index 40d6a53..0000000 --- a/src/event/repo/sequence.rs +++ /dev/null @@ -1,50 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - clock::DateTime, - event::{Instant, Sequence}, -}; - -pub trait Provider { - fn sequence(&mut self) -> Sequences; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn sequence(&mut self) -> Sequences { - Sequences(self) - } -} - -pub struct Sequences<'t>(&'t mut SqliteConnection); - -impl<'c> Sequences<'c> { - pub async fn next(&mut self, at: &DateTime) -> Result { - let next = sqlx::query_scalar!( - r#" - update event_sequence - set last_value = last_value + 1 - returning last_value as "next_value: Sequence" - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(Instant { - at: *at, - sequence: next, - }) - } - - pub async fn current(&mut self) -> Result { - let next = sqlx::query_scalar!( - r#" - select last_value as "last_value: Sequence" - from event_sequence - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } -} diff --git a/src/event/routes.rs b/src/event/routes.rs index c87bfb2..5b9c7e3 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -10,11 +10,11 @@ use axum::{ use axum_extra::extract::Query; use futures::stream::{Stream, StreamExt as _}; -use super::{extract::LastEventId, types}; +use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::Sequence, + event::{Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -35,7 +35,7 @@ async fn events( identity: Identity, last_event_id: Option>, Query(query): Query, -) -> Result + std::fmt::Debug>, EventsError> { +) -> Result + std::fmt::Debug>, EventsError> { let resume_at = last_event_id .map(LastEventId::into_inner) .or(query.resume_point); @@ -51,7 +51,7 @@ struct Events(S); impl IntoResponse for Events where - S: Stream + Send + 'static, + S: Stream + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -62,11 +62,11 @@ where } } -impl TryFrom for sse::Event { +impl TryFrom for sse::Event { type Error = serde_json::Error; - fn try_from(event: types::ChannelEvent) -> Result { - let id = serde_json::to_string(&Sequence::from(&event))?; + fn try_from(event: Event) -> Result { + let id = serde_json::to_string(&event.sequence())?; let data = serde_json::to_string_pretty(&event)?; let event = Self::default().id(id).data(data); diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index 68b55cc..ba9953e 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -6,7 +6,7 @@ use futures::{ }; use crate::{ - event::{routes, Sequence}, + event::{routes, Sequenced as _}, test::fixtures::{self, future::Immediately as _}, }; @@ -17,7 +17,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Call the endpoint @@ -36,7 +36,7 @@ async fn includes_historical_message() { .await .expect("delivered stored message"); - assert_eq!(message, event); + assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] @@ -58,7 +58,7 @@ async fn includes_live_message() { // Verify the semantics let sender = fixtures::login::create(&app).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events .filter(fixtures::filter::messages()) @@ -67,7 +67,7 @@ async fn includes_live_message() { .await .expect("delivered live message"); - assert_eq!(message, event); + assert!(fixtures::event::message_sent(&event, &message)); } #[tokio::test] @@ -87,7 +87,7 @@ async fn includes_multiple_channels() { let app = app.clone(); let sender = sender.clone(); let channel = channel.clone(); - async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } + async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } }) .collect::>() .await; @@ -110,7 +110,9 @@ async fn includes_multiple_channels() { .await; for message in &messages { - assert!(events.iter().any(|event| { event == message })); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } } @@ -123,9 +125,9 @@ async fn sequential_messages() { let sender = fixtures::login::create(&app).await; let messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -138,7 +140,13 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| future::ready(messages.contains(event))); + let mut events = events.filter(|event| { + future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)), + ) + }); // Verify delivery in order for message in &messages { @@ -148,7 +156,7 @@ async fn sequential_messages() { .await .expect("undelivered messages remaining"); - assert_eq!(message, &event); + assert!(fixtures::event::message_sent(&event, message)); } } @@ -160,11 +168,11 @@ async fn resumes_from() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; - let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; + let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let later_messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -190,9 +198,9 @@ async fn resumes_from() { .await .expect("delivered events"); - assert_eq!(initial_message, event); + assert!(fixtures::event::message_sent(&event, &initial_message)); - Sequence::from(&event) + event.sequence() }; // Resume after disconnect @@ -214,7 +222,9 @@ async fn resumes_from() { .await; for message in &later_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } } @@ -249,8 +259,8 @@ async fn serial_resume() { let resume_at = { let initial_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // First subscription @@ -271,12 +281,14 @@ async fn serial_resume() { .await; for message in &initial_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); - Sequence::from(event) + event.sequence() }; // Resume after disconnect @@ -285,8 +297,8 @@ async fn serial_resume() { // Note that channel_b does not appear here. The buggy behaviour // would be masked if channel_b happened to send a new message // into the resumed event stream. - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, ]; // Second subscription @@ -307,12 +319,14 @@ async fn serial_resume() { .await; for message in &resume_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } let event = events.last().expect("this vec is non-empty"); - Sequence::from(event) + event.sequence() }; // Resume after disconnect a second time @@ -321,8 +335,8 @@ async fn serial_resume() { // problem. The resume point should before both of these messages, but // after _all_ prior messages. let final_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, ]; // Third subscription @@ -345,7 +359,9 @@ async fn serial_resume() { // This set of messages, in particular, _should not_ include any prior // messages from `initial_messages` or `resume_messages`. for message in &final_messages { - assert!(events.iter().any(|event| event == message)); + assert!(events + .iter() + .any(|event| fixtures::event::message_sent(event, message))); } }; } @@ -378,13 +394,17 @@ async fn terminates_on_token_expiry() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events - .filter(|event| future::ready(messages.contains(event))) + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) .next() .immediately() .await @@ -425,13 +445,17 @@ async fn terminates_on_logout() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, ]; assert!(events - .filter(|event| future::ready(messages.contains(event))) + .filter(|event| future::ready( + messages + .iter() + .any(|message| fixtures::event::message_sent(event, message)) + )) .next() .immediately() .await diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9ebddd7..c566156 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,20 @@ use std::fmt; +use crate::clock::DateTime; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Instant { + pub at: DateTime, + #[serde(skip)] + pub sequence: Sequence, +} + +impl From for Sequence { + fn from(instant: Instant) -> Self { + instant.sequence + } +} + #[derive( Clone, Copy, @@ -22,3 +37,47 @@ impl fmt::Display for Sequence { value.fmt(f) } } + +impl Sequence { + pub fn up_to(resume_point: Option) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + } + + pub fn after(resume_point: Option) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point < Some(event.sequence()) + } + + pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool + where + E: Sequenced, + { + move |event| resume_point <= event.sequence() + } +} + +pub trait Sequenced { + fn instant(&self) -> Instant; + + fn sequence(&self) -> Sequence { + self.instant().into() + } +} + +impl Sequenced for &E +where + E: Sequenced, +{ + fn instant(&self) -> Instant { + (*self).instant() + } + + fn sequence(&self) -> Sequence { + (*self).sequence() + } +} diff --git a/src/event/types.rs b/src/event/types.rs deleted file mode 100644 index 2324dc1..0000000 --- a/src/event/types.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{ - channel::{self, Channel}, - event::{Instant, Sequence}, - login::Login, - message::{self, Message}, -}; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct ChannelEvent { - #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] - pub data: ChannelEventData, -} - -impl ChannelEvent { - pub fn created(channel: Channel) -> Self { - Self { - instant: channel.created, - data: CreatedEvent { channel }.into(), - } - } -} - -impl<'c> From<&'c ChannelEvent> for Sequence { - fn from(event: &'c ChannelEvent) -> Self { - event.instant.sequence - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ChannelEventData { - Created(CreatedEvent), - Message(MessageEvent), - MessageDeleted(MessageDeletedEvent), - Deleted(DeletedEvent), -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct CreatedEvent { - pub channel: Channel, -} - -impl From for ChannelEventData { - fn from(event: CreatedEvent) -> Self { - Self::Created(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageEvent { - pub channel: Channel, - pub sender: Login, - pub message: Message, -} - -impl From for ChannelEventData { - fn from(event: MessageEvent) -> Self { - Self::Message(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageDeletedEvent { - pub channel: Channel, - pub message: message::Id, -} - -impl From for ChannelEventData { - fn from(event: MessageDeletedEvent) -> Self { - Self::MessageDeleted(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct DeletedEvent { - pub channel: channel::Id, -} - -impl From for ChannelEventData { - fn from(event: DeletedEvent) -> Self { - Self::Deleted(event) - } -} diff --git a/src/expire.rs b/src/expire.rs index a8eb8ad..e50bcb4 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -14,7 +14,7 @@ pub async fn middleware( next: Next, ) -> Result { app.tokens().expire(&expired_at).await?; - app.events().expire(&expired_at).await?; + app.messages().expire(&expired_at).await?; app.channels().expire(&expired_at).await?; Ok(next.run(req).await) } diff --git a/src/message/app.rs b/src/message/app.rs new file mode 100644 index 0000000..51f772e --- /dev/null +++ b/src/message/app.rs @@ -0,0 +1,88 @@ +use chrono::TimeDelta; +use itertools::Itertools; +use sqlx::sqlite::SqlitePool; + +use super::{repo::Provider as _, Message}; +use crate::{ + channel::{self, repo::Provider as _}, + clock::DateTime, + db::NotFound as _, + event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + login::Login, +}; + +pub struct Messages<'a> { + db: &'a SqlitePool, + events: &'a Broadcaster, +} + +impl<'a> Messages<'a> { + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } + } + + pub async fn send( + &self, + channel: &channel::Id, + sender: &Login, + sent_at: &DateTime, + body: &str, + ) -> Result { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::ChannelNotFound(channel.clone()))?; + let sent = tx.sequence().next(sent_at).await?; + let message = tx + .messages() + .create(&channel.snapshot(), sender, &sent, body) + .await?; + tx.commit().await?; + + for event in message.events() { + self.events.broadcast(event); + } + + Ok(message.snapshot()) + } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.messages().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for (channel, message) in expired { + let deleted = tx.sequence().next(relative_to).await?; + let message = tx.messages().delete(&channel, &message, &deleted).await?; + events.push( + message + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); + } + + tx.commit().await?; + + for event in events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + { + self.events.broadcast(event); + } + + Ok(()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} diff --git a/src/message/event.rs b/src/message/event.rs new file mode 100644 index 0000000..bcc2238 --- /dev/null +++ b/src/message/event.rs @@ -0,0 +1,50 @@ +use super::{snapshot::Message, Id}; +use crate::{ + channel::Channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Sent(Sent), + Deleted(Deleted), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Sent { + #[serde(flatten)] + pub message: Message, +} + +impl From for Kind { + fn from(event: Sent) -> Self { + Self::Sent(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + pub channel: Channel, + pub message: Id, +} + +impl From for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/message/history.rs b/src/message/history.rs new file mode 100644 index 0000000..5aca47e --- /dev/null +++ b/src/message/history.rs @@ -0,0 +1,43 @@ +use super::{ + event::{Deleted, Event, Sent}, + Message, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub message: Message, + pub sent: Instant, + pub deleted: Option, +} + +impl History { + fn sent(&self) -> Event { + Event { + instant: self.sent, + kind: Sent { + message: self.message.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option { + self.deleted.map(|instant| Event { + instant, + kind: Deleted { + channel: self.message.channel.clone(), + message: self.message.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator { + [self.sent()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Message { + self.message.clone() + } +} diff --git a/src/message/mod.rs b/src/message/mod.rs index 9a9bf14..52d56c1 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,9 +1,8 @@ +pub mod app; +pub mod event; +mod history; mod id; +pub mod repo; +mod snapshot; -pub use self::id::Id; - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Message { - pub id: Id, - pub body: String, -} +pub use self::{event::Event, history::History, id::Id, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs new file mode 100644 index 0000000..3b2b8f7 --- /dev/null +++ b/src/message/repo.rs @@ -0,0 +1,214 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use super::{snapshot::Message, History, Id}; +use crate::{ + channel::{self, Channel}, + clock::DateTime, + event::{Instant, Sequence}, + login::{self, Login}, +}; + +pub trait Provider { + fn messages(&mut self) -> Messages; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn messages(&mut self) -> Messages { + Messages(self) + } +} + +pub struct Messages<'t>(&'t mut SqliteConnection); + +impl<'c> Messages<'c> { + pub async fn create( + &mut self, + channel: &Channel, + sender: &Login, + sent: &Instant, + body: &str, + ) -> Result { + let id = Id::generate(); + + let message = sqlx::query!( + r#" + insert into message + (id, channel, sender, sent_at, sent_sequence, body) + values ($1, $2, $3, $4, $5, $6) + returning + id as "id: Id", + body + "#, + id, + channel.id, + sender.id, + sent.at, + sent.sequence, + body, + ) + .map(|row| History { + message: Message { + channel: channel.clone(), + sender: sender.clone(), + id: row.id, + body: row.body, + }, + sent: *sent, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result { + let message = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where message.id = $1 + and message.channel = $2 + "#, + message, + channel.id, + ) + .map(|row| History { + message: Message { + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn delete( + &mut self, + channel: &Channel, + message: &Id, + deleted: &Instant, + ) -> Result { + let history = self.by_id(channel, message).await?; + + sqlx::query_scalar!( + r#" + delete from message + where + id = $1 + returning 1 as "deleted: i64" + "#, + history.message.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(History { + deleted: Some(*deleted), + ..history + }) + } + + pub async fn expired( + &mut self, + expire_at: &DateTime, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + message.id as "message: Id" + from message + join channel on message.channel = channel.id + where sent_at < $1 + "#, + expire_at, + ) + .map(|row| { + ( + Channel { + id: row.channel_id, + name: row.channel_name, + }, + row.message, + ) + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn replay( + &mut self, + resume_at: Option, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + join channel on message.channel = channel.id + join login as sender on message.sender = sender.id + where coalesce(message.sent_sequence > $1, true) + "#, + resume_at, + ) + .map(|row| History { + message: Message { + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs new file mode 100644 index 0000000..3adccbe --- /dev/null +++ b/src/message/snapshot.rs @@ -0,0 +1,74 @@ +use super::{ + event::{Event, Kind, Sent}, + Id, +}; +use crate::{channel::Channel, login::Login}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(into = "self::serialize::Message")] +pub struct Message { + pub channel: Channel, + pub sender: Login, + pub id: Id, + pub body: String, +} + +mod serialize { + use crate::{channel::Channel, login::Login, message::Id}; + + #[derive(serde::Serialize)] + pub struct Message { + channel: Channel, + sender: Login, + #[allow(clippy::struct_field_names)] + // Deliberately redundant with the module path; this produces a specific serialization. + message: MessageData, + } + + #[derive(serde::Serialize)] + pub struct MessageData { + id: Id, + body: String, + } + + impl From for Message { + fn from(message: super::Message) -> Self { + Self { + channel: message.channel, + sender: message.sender, + message: MessageData { + id: message.id, + body: message.body, + }, + } + } + } +} + +impl Message { + fn apply(state: Option, event: Event) -> Option { + match (state, event.kind) { + (None, Kind::Sent(event)) => Some(event.into()), + (Some(message), Kind::Deleted(event)) if message.id == event.message => None, + (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator for Option { + fn from_iter>(events: I) -> Self { + events.into_iter().fold(None, Message::apply) + } +} + +impl From<&Sent> for Message { + fn from(event: &Sent) -> Self { + event.message.clone() + } +} + +impl From for Message { + fn from(event: Sent) -> Self { + event.message + } +} diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs new file mode 100644 index 0000000..09f0490 --- /dev/null +++ b/src/test/fixtures/event.rs @@ -0,0 +1,11 @@ +use crate::{ + event::{Event, Kind}, + message::Message, +}; + +pub fn message_sent(event: &Event, message: &Message) -> bool { + matches!( + &event.kind, + Kind::MessageSent(event) if message == &event.into() + ) +} diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index d1939a5..6e62aea 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -1,11 +1,11 @@ use futures::future; -use crate::event::types; +use crate::event::{Event, Kind}; -pub fn messages() -> impl FnMut(&types::ChannelEvent) -> future::Ready { - |event| future::ready(matches!(event.data, types::ChannelEventData::Message(_))) +pub fn messages() -> impl FnMut(&Event) -> future::Ready { + |event| future::ready(matches!(event.kind, Kind::MessageSent(_))) } -pub fn created() -> impl FnMut(&types::ChannelEvent) -> future::Ready { - |event| future::ready(matches!(event.data, types::ChannelEventData::Created(_))) +pub fn created() -> impl FnMut(&Event) -> future::Ready { + |event| future::ready(matches!(event.kind, Kind::ChannelCreated(_))) } diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index fd50887..381b10b 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -1,17 +1,12 @@ use faker_rand::lorem::Paragraphs; -use crate::{app::App, channel::Channel, clock::RequestedAt, event::types, login::Login}; +use crate::{app::App, channel::Channel, clock::RequestedAt, login::Login, message::Message}; -pub async fn send( - app: &App, - login: &Login, - channel: &Channel, - sent_at: &RequestedAt, -) -> types::ChannelEvent { +pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &RequestedAt) -> Message { let body = propose(); - app.events() - .send(login, &channel.id, &body, sent_at) + app.messages() + .send(&channel.id, login, sent_at, &body) .await .expect("should succeed if the channel exists") } diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 76467ab..c5efa9b 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; pub mod channel; +pub mod event; pub mod filter; pub mod future; pub mod identity; diff --git a/src/token/app.rs b/src/token/app.rs index 030ec69..5c4fcd5 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -127,7 +127,7 @@ impl<'a> Tokens<'a> { tx.commit().await?; for event in tokens.into_iter().map(event::TokenRevoked::from) { - self.tokens.broadcast(&event); + self.tokens.broadcast(event); } Ok(()) @@ -139,7 +139,7 @@ impl<'a> Tokens<'a> { tx.commit().await?; self.tokens - .broadcast(&event::TokenRevoked::from(token.clone())); + .broadcast(event::TokenRevoked::from(token.clone())); Ok(()) } -- cgit v1.2.3 From 617172576b95bbb935a75f98a98787da5a4e9a9d Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 3 Oct 2024 20:44:07 -0400 Subject: List messages per channel. --- ...4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 ------ ...4a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json | 62 ++++++++++++++++++ docs/api.md | 30 +++++++++ src/channel/app.rs | 44 +++++++++++-- src/channel/repo.rs | 7 +- src/channel/routes.rs | 76 ++++++++++++++++++---- src/event/app.rs | 8 +-- src/event/mod.rs | 2 +- src/event/sequence.rs | 7 ++ src/message/app.rs | 4 +- src/message/event.rs | 27 +++++++- src/message/history.rs | 4 +- src/message/repo.rs | 57 ++++++++++++---- src/message/snapshot.rs | 4 +- 14 files changed, 281 insertions(+), 71 deletions(-) delete mode 100644 .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json create mode 100644 .sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json (limited to 'src/event/mod.rs') diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json deleted file mode 100644 index ee0f235..0000000 --- a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false - ] - }, - "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" -} diff --git a/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json new file mode 100644 index 0000000..82246ac --- /dev/null +++ b/.sqlx/query-9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where channel.id = $1\n and coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "9606853f2ea9f776f7e4384a2137be57b3a45fe38a675262ceaaebb3d346a9ca" +} diff --git a/docs/api.md b/docs/api.md index ef211bc..e8c8c8c 100644 --- a/docs/api.md +++ b/docs/api.md @@ -127,6 +127,36 @@ Channel names must be unique. If a channel with the same name already exists, th The API delivers events to clients to update them on other clients' actions and messages. While there is no specific delivery deadline, messages are delivered as soon as possible on a best-effort basis, and the event system allows clients to replay events or resume interrupted streams, to allow recovery if a message is lost. +### `GET /api/channels/:channel/messages` + +Retrieves historical messages in a channel. + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### Query parameters + +This endpoint accepts an optional `resume_point` query parameter. If provided, the value must be the value obtained from the `/api/boot` method. This parameter will restrict the returned list to messages as they existed at a fixed point in time, with any later changes only appearing in the event stream. + +#### On success + +Responds with a list of message objects, one per message: + +```json +[ + { + "at": "2024-09-27T23:19:10.208147Z", + "sender": { + "id": "L1234abcd", + "name": "example username" + }, + "message": { + "id": "M1312acab", + "body": "beep" + } + } +] +``` + ### `POST /api/channels/:channel` Sends a chat message to a channel. It will be relayed to clients subscribed to the channel's events, and recorded for replay. diff --git a/src/channel/app.rs b/src/channel/app.rs index 24be2ff..b3bfbee 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, - message::repo::Provider as _, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -54,22 +54,52 @@ impl<'a> Channels<'a> { Ok(channels) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn messages( + &self, + channel: &Id, + resume_point: Option, + ) -> Result, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| DeleteError::NotFound(channel.clone()))? + .not_found(|| Error::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel).await?; + let messages = tx.messages().in_channel(&channel, None).await?; for message in messages { + let message = message.snapshot(); let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; events.extend( message .events() @@ -117,7 +147,7 @@ impl<'a> Channels<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.sequence() < b.sequence()) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); @@ -135,7 +165,7 @@ pub enum CreateError { } #[derive(Debug, thiserror::Error)] -pub enum DeleteError { +pub enum Error { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 8bb761b..2b48436 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -84,10 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all( - &mut self, - resume_point: Option, - ) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Option) -> Result, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -99,7 +96,7 @@ impl<'c> Channels<'c> { where coalesce(created_sequence <= $1, true) order by channel.name "#, - resume_point, + resume_at, ) .map(|row| History { channel: Channel { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index bce634e..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,13 +7,14 @@ use axum::{ }; use axum_extra::extract::Query; -use super::{ - app::{self, DeleteError}, - Channel, Id, -}; +use super::{app, Channel, Id}; use crate::{ - app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, - message::app::SendError, + app::App, + clock::RequestedAt, + error::Internal, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -25,17 +26,18 @@ pub fn router() -> Router { .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } #[derive(Default, serde::Deserialize)] -struct ListQuery { +struct ResumeQuery { resume_point: Option, } async fn list( State(app): State, _: Login, - Query(query): Query, + Query(query): Query, ) -> Result { let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); @@ -127,7 +129,7 @@ async fn on_delete( Path(channel): Path, RequestedAt(deleted_at): RequestedAt, _: Login, -) -> Result { +) -> Result { app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) @@ -135,16 +137,66 @@ async fn on_delete( #[derive(Debug, thiserror::Error)] #[error(transparent)] -struct DeleteErrorResponse(#[from] DeleteError); +struct ErrorResponse(#[from] app::Error); -impl IntoResponse for DeleteErrorResponse { +impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ DeleteError::NotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State, + Path(channel): Path, + _: Login, + Query(query): Query, +) -> Result { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 32f0a97..d664ec7 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -36,7 +36,7 @@ impl<'a> Events<'a> { let channel_events = channels .iter() .map(channel::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -44,14 +44,12 @@ impl<'a> Events<'a> { let message_events = messages .iter() .map(message::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = channel_events - .merge_by(message_events, |a, b| { - a.instant.sequence < b.instant.sequence - }) + .merge_by(message_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index 1503b77..1349fe6 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -38,7 +38,7 @@ impl From for Event { impl From for Event { fn from(event: message::Event) -> Self { Self { - instant: event.instant, + instant: event.instant(), kind: event.kind.into(), } } diff --git a/src/event/sequence.rs b/src/event/sequence.rs index c566156..fbe3711 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -59,6 +59,13 @@ impl Sequence { { move |event| resume_point <= event.sequence() } + + pub fn merge(a: &E, b: &E) -> bool + where + E: Sequenced, + { + a.sequence() < b.sequence() + } } pub trait Sequenced { diff --git a/src/message/app.rs b/src/message/app.rs index 1d34c14..fd6a334 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, login::Login, }; @@ -87,7 +87,7 @@ impl<'a> Messages<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::>(), ); diff --git a/src/message/event.rs b/src/message/event.rs index bcc2238..66db9b0 100644 --- a/src/message/event.rs +++ b/src/message/event.rs @@ -6,15 +6,13 @@ use crate::{ #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { - #[serde(flatten)] - pub instant: Instant, #[serde(flatten)] pub kind: Kind, } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant + self.kind.instant() } } @@ -25,12 +23,27 @@ pub enum Kind { Deleted(Deleted), } +impl Sequenced for Kind { + fn instant(&self) -> Instant { + match self { + Self::Sent(sent) => sent.instant(), + Self::Deleted(deleted) => deleted.instant(), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Sent { #[serde(flatten)] pub message: Message, } +impl Sequenced for Sent { + fn instant(&self) -> Instant { + self.message.sent + } +} + impl From for Kind { fn from(event: Sent) -> Self { Self::Sent(event) @@ -39,10 +52,18 @@ impl From for Kind { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, pub channel: Channel, pub message: Id, } +impl Sequenced for Deleted { + fn instant(&self) -> Instant { + self.instant + } +} + impl From for Kind { fn from(event: Deleted) -> Self { Self::Deleted(event) diff --git a/src/message/history.rs b/src/message/history.rs index 5aca47e..89fc6b1 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -7,14 +7,12 @@ use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, - pub sent: Instant, pub deleted: Option, } impl History { fn sent(&self) -> Event { Event { - instant: self.sent, kind: Sent { message: self.message.clone(), } @@ -24,8 +22,8 @@ impl History { fn deleted(&self) -> Option { self.deleted.map(|instant| Event { - instant, kind: Deleted { + instant, channel: self.message.channel.clone(), message: self.message.id.clone(), } diff --git a/src/message/repo.rs b/src/message/repo.rs index ae41736..fc835c8 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -48,12 +48,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: *sent, channel: channel.clone(), sender: sender.clone(), id: row.id, body: row.body, }, - sent: *sent, deleted: None, }) .fetch_one(&mut *self.0) @@ -62,18 +62,51 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel(&mut self, channel: &Channel) -> Result, sqlx::Error> { - let messages = sqlx::query_scalar!( + pub async fn in_channel( + &mut self, + channel: &Channel, + resume_at: Option, + ) -> Result, sqlx::Error> { + let messages = sqlx::query!( r#" select - message.id as "id: Id" + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where channel.id = $1 + and coalesce(message.sent_sequence <= $2, true) order by message.sent_sequence "#, channel.id, + resume_at, ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) .fetch_all(&mut *self.0) .await?; @@ -101,6 +134,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -112,10 +149,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_one(&mut *self.0) @@ -189,6 +222,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -200,10 +237,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_all(&mut *self.0) diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 3adccbe..522c1aa 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,11 +2,13 @@ use super::{ event::{Event, Kind, Sent}, Id, }; -use crate::{channel::Channel, login::Login}; +use crate::{channel::Channel, event::Instant, login::Login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(into = "self::serialize::Message")] pub struct Message { + #[serde(skip)] + pub sent: Instant, pub channel: Channel, pub sender: Login, pub id: Id, -- cgit v1.2.3