summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 01:31:43 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-02 01:31:43 -0400
commit469613872f6fb19f4579b387e19b2bc38fa52f51 (patch)
tree16edc3e8fac1a418c4b9ed5450167a793a7d6c8b /src/event
parent6f07e6869bbf62903ac83c9bc061e7bde997e6a8 (diff)
Package up common event fields as Instant
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs18
-rw-r--r--src/event/mod.rs9
-rw-r--r--src/event/repo/message.rs42
-rw-r--r--src/event/repo/sequence.rs12
-rw-r--r--src/event/routes.rs2
-rw-r--r--src/event/routes/test.rs8
-rw-r--r--src/event/types.rs22
7 files changed, 63 insertions, 50 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index 3d35f1a..5e9e79a 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -42,10 +42,10 @@ impl<'a> Events<'a> {
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let sent_sequence = tx.sequence().next().await?;
+ let sent = tx.sequence().next(sent_at).await?;
let event = tx
.message_events()
- .create(login, &channel, sent_at, sent_sequence, body)
+ .create(login, &channel, &sent, body)
.await?;
tx.commit().await?;
@@ -62,10 +62,10 @@ impl<'a> Events<'a> {
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
- let deleted_sequence = tx.sequence().next().await?;
+ let deleted = tx.sequence().next(relative_to).await?;
let event = tx
.message_events()
- .delete(&channel, &message, relative_to, deleted_sequence)
+ .delete(&channel, &message, &deleted)
.await?;
events.push(event);
}
@@ -93,7 +93,9 @@ impl<'a> Events<'a> {
let channel_events = channels
.into_iter()
.map(ChannelEvent::created)
- .filter(move |event| resume_at.map_or(true, |resume_at| event.sequence > resume_at));
+ .filter(move |event| {
+ resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at)
+ });
let message_events = tx.message_events().replay(resume_at).await?;
@@ -101,8 +103,8 @@ impl<'a> Events<'a> {
.into_iter()
.chain(message_events.into_iter())
.collect::<Vec<_>>();
- replay_events.sort_by_key(|event| event.sequence);
- let resume_live_at = replay_events.last().map(|event| event.sequence);
+ replay_events.sort_by_key(|event| Sequence::from(event));
+ let resume_live_at = replay_events.last().map(Sequence::from);
let replay = stream::iter(replay_events);
@@ -124,7 +126,7 @@ impl<'a> Events<'a> {
fn resume(
resume_at: Option<Sequence>,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- move |event| future::ready(resume_at < Some(event.sequence))
+ move |event| future::ready(resume_at < Some(Sequence::from(event)))
}
}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 7ad3f9c..c982d3a 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -6,4 +6,13 @@ mod routes;
mod sequence;
pub mod types;
+use crate::clock::DateTime;
+
pub use self::{routes::router, sequence::Sequence};
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Instant {
+ pub at: DateTime,
+ #[serde(skip)]
+ pub sequence: Sequence,
+}
diff --git a/src/event/repo/message.rs b/src/event/repo/message.rs
index f051fec..f29c8a4 100644
--- a/src/event/repo/message.rs
+++ b/src/event/repo/message.rs
@@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
channel::{self, Channel},
clock::DateTime,
- event::{types, Sequence},
+ event::{types, Instant, Sequence},
login::{self, Login},
message::{self, Message},
};
@@ -25,8 +25,7 @@ impl<'c> Events<'c> {
&mut self,
sender: &Login,
channel: &Channel,
- sent_at: &DateTime,
- sent_sequence: Sequence,
+ sent: &Instant,
body: &str,
) -> Result<types::ChannelEvent, sqlx::Error> {
let id = message::Id::generate();
@@ -46,13 +45,15 @@ impl<'c> Events<'c> {
id,
channel.id,
sender.id,
- sent_at,
- sent_sequence,
+ sent.at,
+ sent.sequence,
body,
)
.map(|row| types::ChannelEvent {
- sequence: row.sent_sequence,
- at: row.sent_at,
+ instant: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
data: types::MessageEvent {
channel: channel.clone(),
sender: sender.clone(),
@@ -73,8 +74,7 @@ impl<'c> Events<'c> {
&mut self,
channel: &Channel,
message: &message::Id,
- deleted_at: &DateTime,
- deleted_sequence: Sequence,
+ deleted: &Instant,
) -> Result<types::ChannelEvent, sqlx::Error> {
sqlx::query_scalar!(
r#"
@@ -88,8 +88,10 @@ impl<'c> Events<'c> {
.await?;
Ok(types::ChannelEvent {
- sequence: deleted_sequence,
- at: *deleted_at,
+ instant: Instant {
+ at: deleted.at,
+ sequence: deleted.sequence,
+ },
data: types::MessageDeletedEvent {
channel: channel.clone(),
message: message.clone(),
@@ -122,8 +124,10 @@ impl<'c> Events<'c> {
Channel {
id: row.channel_id,
name: row.channel_name,
- created_at: row.channel_created_at,
- created_sequence: row.channel_created_sequence,
+ created: Instant {
+ at: row.channel_created_at,
+ sequence: row.channel_created_sequence,
+ },
},
row.message,
)
@@ -160,14 +164,18 @@ impl<'c> Events<'c> {
resume_at,
)
.map(|row| types::ChannelEvent {
- sequence: row.sent_sequence,
- at: row.sent_at,
+ instant: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
data: types::MessageEvent {
channel: Channel {
id: row.channel_id,
name: row.channel_name,
- created_at: row.channel_created_at,
- created_sequence: row.channel_created_sequence,
+ created: Instant {
+ at: row.channel_created_at,
+ sequence: row.channel_created_sequence,
+ },
},
sender: Login {
id: row.sender_id,
diff --git a/src/event/repo/sequence.rs b/src/event/repo/sequence.rs
index c985869..40d6a53 100644
--- a/src/event/repo/sequence.rs
+++ b/src/event/repo/sequence.rs
@@ -1,6 +1,9 @@
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::event::Sequence;
+use crate::{
+ clock::DateTime,
+ event::{Instant, Sequence},
+};
pub trait Provider {
fn sequence(&mut self) -> Sequences;
@@ -15,7 +18,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Sequences<'t>(&'t mut SqliteConnection);
impl<'c> Sequences<'c> {
- pub async fn next(&mut self) -> Result<Sequence, sqlx::Error> {
+ pub async fn next(&mut self, at: &DateTime) -> Result<Instant, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update event_sequence
@@ -26,7 +29,10 @@ impl<'c> Sequences<'c> {
.fetch_one(&mut *self.0)
.await?;
- Ok(next)
+ Ok(Instant {
+ at: *at,
+ sequence: next,
+ })
}
pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> {
diff --git a/src/event/routes.rs b/src/event/routes.rs
index 50ac435..c87bfb2 100644
--- a/src/event/routes.rs
+++ b/src/event/routes.rs
@@ -66,7 +66,7 @@ impl TryFrom<types::ChannelEvent> for sse::Event {
type Error = serde_json::Error;
fn try_from(event: types::ChannelEvent) -> Result<Self, Self::Error> {
- let id = serde_json::to_string(&event.sequence)?;
+ let id = serde_json::to_string(&Sequence::from(&event))?;
let data = serde_json::to_string_pretty(&event)?;
let event = Self::default().id(id).data(data);
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index d1ac3b4..68b55cc 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -6,7 +6,7 @@ use futures::{
};
use crate::{
- event::routes,
+ event::{routes, Sequence},
test::fixtures::{self, future::Immediately as _},
};
@@ -192,7 +192,7 @@ async fn resumes_from() {
assert_eq!(initial_message, event);
- event.sequence
+ Sequence::from(&event)
};
// Resume after disconnect
@@ -276,7 +276,7 @@ async fn serial_resume() {
let event = events.last().expect("this vec is non-empty");
- event.sequence
+ Sequence::from(event)
};
// Resume after disconnect
@@ -312,7 +312,7 @@ async fn serial_resume() {
let event = events.last().expect("this vec is non-empty");
- event.sequence
+ Sequence::from(event)
};
// Resume after disconnect a second time
diff --git a/src/event/types.rs b/src/event/types.rs
index cd7dea6..2324dc1 100644
--- a/src/event/types.rs
+++ b/src/event/types.rs
@@ -1,16 +1,14 @@
use crate::{
channel::{self, Channel},
- clock::DateTime,
- event::Sequence,
+ event::{Instant, Sequence},
login::Login,
message::{self, Message},
};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct ChannelEvent {
- #[serde(skip)]
- pub sequence: Sequence,
- pub at: DateTime,
+ #[serde(flatten)]
+ pub instant: Instant,
#[serde(flatten)]
pub data: ChannelEventData,
}
@@ -18,25 +16,15 @@ pub struct ChannelEvent {
impl ChannelEvent {
pub fn created(channel: Channel) -> Self {
Self {
- at: channel.created_at,
- sequence: channel.created_sequence,
+ instant: channel.created,
data: CreatedEvent { channel }.into(),
}
}
-
- pub fn channel_id(&self) -> &channel::Id {
- match &self.data {
- ChannelEventData::Created(event) => &event.channel.id,
- ChannelEventData::Message(event) => &event.channel.id,
- ChannelEventData::MessageDeleted(event) => &event.channel.id,
- ChannelEventData::Deleted(event) => &event.channel,
- }
- }
}
impl<'c> From<&'c ChannelEvent> for Sequence {
fn from(event: &'c ChannelEvent) -> Self {
- event.sequence
+ event.instant.sequence
}
}