diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-02 01:31:43 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-02 01:31:43 -0400 |
| commit | 469613872f6fb19f4579b387e19b2bc38fa52f51 (patch) | |
| tree | 16edc3e8fac1a418c4b9ed5450167a793a7d6c8b /src | |
| parent | 6f07e6869bbf62903ac83c9bc061e7bde997e6a8 (diff) | |
Package up common event fields as Instant
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 11 | ||||
| -rw-r--r-- | src/channel/mod.rs | 6 | ||||
| -rw-r--r-- | src/channel/repo.rs | 74 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 2 | ||||
| -rw-r--r-- | src/event/app.rs | 18 | ||||
| -rw-r--r-- | src/event/mod.rs | 9 | ||||
| -rw-r--r-- | src/event/repo/message.rs | 42 | ||||
| -rw-r--r-- | src/event/repo/sequence.rs | 12 | ||||
| -rw-r--r-- | src/event/routes.rs | 2 | ||||
| -rw-r--r-- | src/event/routes/test.rs | 8 | ||||
| -rw-r--r-- | src/event/types.rs | 22 |
11 files changed, 121 insertions, 85 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index ef0a63f..b7e3a10 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -19,10 +19,10 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; - let created_sequence = tx.sequence().next().await?; + let created = tx.sequence().next(created_at).await?; let channel = tx .channels() - .create(name, created_at, created_sequence) + .create(name, &created) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; @@ -50,11 +50,8 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { - let deleted_sequence = tx.sequence().next().await?; - let event = tx - .channels() - .delete(&channel, relative_to, deleted_sequence) - .await?; + let deleted = tx.sequence().next(relative_to).await?; + let event = tx.channels().delete(&channel, &deleted).await?; events.push(event); } diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 2672084..4baa7e3 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,4 @@ -use crate::{clock::DateTime, event::Sequence}; +use crate::event::Instant; pub mod app; mod id; @@ -12,7 +12,5 @@ pub struct Channel { pub id: Id, pub name: String, #[serde(skip)] - pub created_at: DateTime, - #[serde(skip)] - pub created_sequence: Sequence, + pub created: Instant, } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 18cd81f..c000b56 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, Id}, clock::DateTime, - event::{types, Sequence}, + event::{types, Instant, Sequence}, }; pub trait Provider { @@ -19,15 +19,9 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create( - &mut self, - name: &str, - created_at: &DateTime, - created_sequence: Sequence, - ) -> Result<Channel, sqlx::Error> { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> { let id = Id::generate(); - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" insert into channel (id, name, created_at, created_sequence) @@ -40,9 +34,17 @@ impl<'c> Channels<'c> { "#, id, name, - created_at, - created_sequence, + created.at, + created.sequence, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -50,8 +52,7 @@ impl<'c> Channels<'c> { } pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> { - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" select id as "id: Id", @@ -63,6 +64,14 @@ impl<'c> Channels<'c> { "#, channel, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -73,8 +82,7 @@ impl<'c> Channels<'c> { &mut self, resume_point: Option<Sequence>, ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -87,6 +95,14 @@ impl<'c> Channels<'c> { "#, resume_point, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -97,8 +113,7 @@ impl<'c> Channels<'c> { &mut self, resume_at: Option<Sequence>, ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -110,6 +125,14 @@ impl<'c> Channels<'c> { "#, resume_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -119,8 +142,7 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, channel: &Channel, - deleted_at: &DateTime, - deleted_sequence: Sequence, + deleted: &Instant, ) -> Result<types::ChannelEvent, sqlx::Error> { let channel = channel.id.clone(); sqlx::query_scalar!( @@ -135,15 +157,13 @@ impl<'c> Channels<'c> { .await?; Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, + instant: *deleted, data: types::DeletedEvent { channel }.into(), }) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select channel.id as "id: Id", @@ -157,6 +177,14 @@ impl<'c> Channels<'c> { "#, expired_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 6f844cd..33ec3b7 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -52,7 +52,7 @@ async fn messages_in_order() { let events = events.collect::<Vec<_>>().immediately().await; for ((sent_at, message), event) in requests.into_iter().zip(events) { - assert_eq!(*sent_at, event.at); + assert_eq!(*sent_at, event.instant.at); assert!(matches!( event.data, types::ChannelEventData::Message(event_message) diff --git a/src/event/app.rs b/src/event/app.rs index 3d35f1a..5e9e79a 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -42,10 +42,10 @@ impl<'a> Events<'a> { .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let sent_sequence = tx.sequence().next().await?; + let sent = tx.sequence().next(sent_at).await?; let event = tx .message_events() - .create(login, &channel, sent_at, sent_sequence, body) + .create(login, &channel, &sent, body) .await?; tx.commit().await?; @@ -62,10 +62,10 @@ impl<'a> Events<'a> { let mut events = Vec::with_capacity(expired.len()); for (channel, message) in expired { - let deleted_sequence = tx.sequence().next().await?; + let deleted = tx.sequence().next(relative_to).await?; let event = tx .message_events() - .delete(&channel, &message, relative_to, deleted_sequence) + .delete(&channel, &message, &deleted) .await?; events.push(event); } @@ -93,7 +93,9 @@ impl<'a> Events<'a> { let channel_events = channels .into_iter() .map(ChannelEvent::created) - .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at)); + .filter(move |event| { + resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at) + }); let message_events = tx.message_events().replay(resume_at).await?; @@ -101,8 +103,8 @@ impl<'a> Events<'a> { .into_iter() .chain(message_events.into_iter()) .collect::<Vec<_>>(); - replay_events.sort_by_key(|event| event.sequence); - let resume_live_at = replay_events.last().map(|event| event.sequence); + replay_events.sort_by_key(|event| Sequence::from(event)); + let resume_live_at = replay_events.last().map(Sequence::from); let replay = stream::iter(replay_events); @@ -124,7 +126,7 @@ impl<'a> Events<'a> { fn resume( resume_at: Option<Sequence>, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < Some(event.sequence)) + move |event| future::ready(resume_at < Some(Sequence::from(event))) } } diff --git a/src/event/mod.rs b/src/event/mod.rs index 7ad3f9c..c982d3a 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -6,4 +6,13 @@ mod routes; mod sequence; pub mod types; +use crate::clock::DateTime; + pub use self::{routes::router, sequence::Sequence}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Instant { + pub at: DateTime, + #[serde(skip)] + pub sequence: Sequence, +} diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs index f051fec..f29c8a4 100644 --- a/src/event/repo/message.rs +++ b/src/event/repo/message.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{self, Channel}, clock::DateTime, - event::{types, Sequence}, + event::{types, Instant, Sequence}, login::{self, Login}, message::{self, Message}, }; @@ -25,8 +25,7 @@ impl<'c> Events<'c> { &mut self, sender: &Login, channel: &Channel, - sent_at: &DateTime, - sent_sequence: Sequence, + sent: &Instant, body: &str, ) -> Result<types::ChannelEvent, sqlx::Error> { let id = message::Id::generate(); @@ -46,13 +45,15 @@ impl<'c> Events<'c> { id, channel.id, sender.id, - sent_at, - sent_sequence, + sent.at, + sent.sequence, body, ) .map(|row| types::ChannelEvent { - sequence: row.sent_sequence, - at: row.sent_at, + instant: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, data: types::MessageEvent { channel: channel.clone(), sender: sender.clone(), @@ -73,8 +74,7 @@ impl<'c> Events<'c> { &mut self, channel: &Channel, message: &message::Id, - deleted_at: &DateTime, - deleted_sequence: Sequence, + deleted: &Instant, ) -> Result<types::ChannelEvent, sqlx::Error> { sqlx::query_scalar!( r#" @@ -88,8 +88,10 @@ impl<'c> Events<'c> { .await?; Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, + instant: Instant { + at: deleted.at, + sequence: deleted.sequence, + }, data: types::MessageDeletedEvent { channel: channel.clone(), message: message.clone(), @@ -122,8 +124,10 @@ impl<'c> Events<'c> { Channel { id: row.channel_id, name: row.channel_name, - created_at: row.channel_created_at, - created_sequence: row.channel_created_sequence, + created: Instant { + at: row.channel_created_at, + sequence: row.channel_created_sequence, + }, }, row.message, ) @@ -160,14 +164,18 @@ impl<'c> Events<'c> { resume_at, ) .map(|row| types::ChannelEvent { - sequence: row.sent_sequence, - at: row.sent_at, + instant: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, data: types::MessageEvent { channel: Channel { id: row.channel_id, name: row.channel_name, - created_at: row.channel_created_at, - created_sequence: row.channel_created_sequence, + created: Instant { + at: row.channel_created_at, + sequence: row.channel_created_sequence, + }, }, sender: Login { id: row.sender_id, diff --git a/src/event/repo/sequence.rs b/src/event/repo/sequence.rs index c985869..40d6a53 100644 --- a/src/event/repo/sequence.rs +++ b/src/event/repo/sequence.rs @@ -1,6 +1,9 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::event::Sequence; +use crate::{ + clock::DateTime, + event::{Instant, Sequence}, +}; pub trait Provider { fn sequence(&mut self) -> Sequences; @@ -15,7 +18,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Sequences<'t>(&'t mut SqliteConnection); impl<'c> Sequences<'c> { - pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> { + pub async fn next(&mut self, at: &DateTime) -> Result<Instant, sqlx::Error> { let next = sqlx::query_scalar!( r#" update event_sequence @@ -26,7 +29,10 @@ impl<'c> Sequences<'c> { .fetch_one(&mut *self.0) .await?; - Ok(next) + Ok(Instant { + at: *at, + sequence: next, + }) } pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> { diff --git a/src/event/routes.rs b/src/event/routes.rs index 50ac435..c87bfb2 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -66,7 +66,7 @@ impl TryFrom<types::ChannelEvent> for sse::Event { type Error = serde_json::Error; fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> { - let id = serde_json::to_string(&event.sequence)?; + let id = serde_json::to_string(&Sequence::from(&event))?; let data = serde_json::to_string_pretty(&event)?; let event = Self::default().id(id).data(data); diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index d1ac3b4..68b55cc 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -6,7 +6,7 @@ use futures::{ }; use crate::{ - event::routes, + event::{routes, Sequence}, test::fixtures::{self, future::Immediately as _}, }; @@ -192,7 +192,7 @@ async fn resumes_from() { assert_eq!(initial_message, event); - event.sequence + Sequence::from(&event) }; // Resume after disconnect @@ -276,7 +276,7 @@ async fn serial_resume() { let event = events.last().expect("this vec is non-empty"); - event.sequence + Sequence::from(event) }; // Resume after disconnect @@ -312,7 +312,7 @@ async fn serial_resume() { let event = events.last().expect("this vec is non-empty"); - event.sequence + Sequence::from(event) }; // Resume after disconnect a second time diff --git a/src/event/types.rs b/src/event/types.rs index cd7dea6..2324dc1 100644 --- a/src/event/types.rs +++ b/src/event/types.rs @@ -1,16 +1,14 @@ use crate::{ channel::{self, Channel}, - clock::DateTime, - event::Sequence, + event::{Instant, Sequence}, login::Login, message::{self, Message}, }; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct ChannelEvent { - #[serde(skip)] - pub sequence: Sequence, - pub at: DateTime, + #[serde(flatten)] + pub instant: Instant, #[serde(flatten)] pub data: ChannelEventData, } @@ -18,25 +16,15 @@ pub struct ChannelEvent { impl ChannelEvent { pub fn created(channel: Channel) -> Self { Self { - at: channel.created_at, - sequence: channel.created_sequence, + instant: channel.created, data: CreatedEvent { channel }.into(), } } - - pub fn channel_id(&self) -> &channel::Id { - match &self.data { - ChannelEventData::Created(event) => &event.channel.id, - ChannelEventData::Message(event) => &event.channel.id, - ChannelEventData::MessageDeleted(event) => &event.channel.id, - ChannelEventData::Deleted(event) => &event.channel, - } - } } impl<'c> From<&'c ChannelEvent> for Sequence { fn from(event: &'c ChannelEvent) -> Self { - event.sequence + event.instant.sequence } } |
