summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-02 01:31:43 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-02 01:31:43 -0400
commit469613872f6fb19f4579b387e19b2bc38fa52f51 (patch)
tree16edc3e8fac1a418c4b9ed5450167a793a7d6c8b /src/channel
parent6f07e6869bbf62903ac83c9bc061e7bde997e6a8 (diff)
Package up common event fields as Instant
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs11
-rw-r--r--src/channel/mod.rs6
-rw-r--r--src/channel/repo.rs74
-rw-r--r--src/channel/routes/test/on_send.rs2
4 files changed, 58 insertions, 35 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index ef0a63f..b7e3a10 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -19,10 +19,10 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
- let created_sequence = tx.sequence().next().await?;
+ let created = tx.sequence().next(created_at).await?;
let channel = tx
.channels()
- .create(name, created_at, created_sequence)
+ .create(name, &created)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
@@ -50,11 +50,8 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
- let deleted_sequence = tx.sequence().next().await?;
- let event = tx
- .channels()
- .delete(&channel, relative_to, deleted_sequence)
- .await?;
+ let deleted = tx.sequence().next(relative_to).await?;
+ let event = tx.channels().delete(&channel, &deleted).await?;
events.push(event);
}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index 2672084..4baa7e3 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,4 +1,4 @@
-use crate::{clock::DateTime, event::Sequence};
+use crate::event::Instant;
pub mod app;
mod id;
@@ -12,7 +12,5 @@ pub struct Channel {
pub id: Id,
pub name: String,
#[serde(skip)]
- pub created_at: DateTime,
- #[serde(skip)]
- pub created_sequence: Sequence,
+ pub created: Instant,
}
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 18cd81f..c000b56 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
channel::{Channel, Id},
clock::DateTime,
- event::{types, Sequence},
+ event::{types, Instant, Sequence},
};
pub trait Provider {
@@ -19,15 +19,9 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
impl<'c> Channels<'c> {
- pub async fn create(
- &mut self,
- name: &str,
- created_at: &DateTime,
- created_sequence: Sequence,
- ) -> Result<Channel, sqlx::Error> {
+ pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> {
let id = Id::generate();
- let channel = sqlx::query_as!(
- Channel,
+ let channel = sqlx::query!(
r#"
insert
into channel (id, name, created_at, created_sequence)
@@ -40,9 +34,17 @@ impl<'c> Channels<'c> {
"#,
id,
name,
- created_at,
- created_sequence,
+ created.at,
+ created.sequence,
)
+ .map(|row| Channel {
+ id: row.id,
+ name: row.name,
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_one(&mut *self.0)
.await?;
@@ -50,8 +52,7 @@ impl<'c> Channels<'c> {
}
pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> {
- let channel = sqlx::query_as!(
- Channel,
+ let channel = sqlx::query!(
r#"
select
id as "id: Id",
@@ -63,6 +64,14 @@ impl<'c> Channels<'c> {
"#,
channel,
)
+ .map(|row| Channel {
+ id: row.id,
+ name: row.name,
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_one(&mut *self.0)
.await?;
@@ -73,8 +82,7 @@ impl<'c> Channels<'c> {
&mut self,
resume_point: Option<Sequence>,
) -> Result<Vec<Channel>, sqlx::Error> {
- let channels = sqlx::query_as!(
- Channel,
+ let channels = sqlx::query!(
r#"
select
id as "id: Id",
@@ -87,6 +95,14 @@ impl<'c> Channels<'c> {
"#,
resume_point,
)
+ .map(|row| Channel {
+ id: row.id,
+ name: row.name,
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_all(&mut *self.0)
.await?;
@@ -97,8 +113,7 @@ impl<'c> Channels<'c> {
&mut self,
resume_at: Option<Sequence>,
) -> Result<Vec<Channel>, sqlx::Error> {
- let channels = sqlx::query_as!(
- Channel,
+ let channels = sqlx::query!(
r#"
select
id as "id: Id",
@@ -110,6 +125,14 @@ impl<'c> Channels<'c> {
"#,
resume_at,
)
+ .map(|row| Channel {
+ id: row.id,
+ name: row.name,
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_all(&mut *self.0)
.await?;
@@ -119,8 +142,7 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
channel: &Channel,
- deleted_at: &DateTime,
- deleted_sequence: Sequence,
+ deleted: &Instant,
) -> Result<types::ChannelEvent, sqlx::Error> {
let channel = channel.id.clone();
sqlx::query_scalar!(
@@ -135,15 +157,13 @@ impl<'c> Channels<'c> {
.await?;
Ok(types::ChannelEvent {
- sequence: deleted_sequence,
- at: *deleted_at,
+ instant: *deleted,
data: types::DeletedEvent { channel }.into(),
})
}
pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
- let channels = sqlx::query_as!(
- Channel,
+ let channels = sqlx::query!(
r#"
select
channel.id as "id: Id",
@@ -157,6 +177,14 @@ impl<'c> Channels<'c> {
"#,
expired_at,
)
+ .map(|row| Channel {
+ id: row.id,
+ name: row.name,
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 6f844cd..33ec3b7 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -52,7 +52,7 @@ async fn messages_in_order() {
let events = events.collect::<Vec<_>>().immediately().await;
for ((sent_at, message), event) in requests.into_iter().zip(events) {
- assert_eq!(*sent_at, event.at);
+ assert_eq!(*sent_at, event.instant.at);
assert!(matches!(
event.data,
types::ChannelEventData::Message(event_message)