diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 11 | ||||
| -rw-r--r-- | src/channel/mod.rs | 6 | ||||
| -rw-r--r-- | src/channel/repo.rs | 74 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 2 |
4 files changed, 58 insertions, 35 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index ef0a63f..b7e3a10 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -19,10 +19,10 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; - let created_sequence = tx.sequence().next().await?; + let created = tx.sequence().next(created_at).await?; let channel = tx .channels() - .create(name, created_at, created_sequence) + .create(name, &created) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; @@ -50,11 +50,8 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { - let deleted_sequence = tx.sequence().next().await?; - let event = tx - .channels() - .delete(&channel, relative_to, deleted_sequence) - .await?; + let deleted = tx.sequence().next(relative_to).await?; + let event = tx.channels().delete(&channel, &deleted).await?; events.push(event); } diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 2672084..4baa7e3 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,4 @@ -use crate::{clock::DateTime, event::Sequence}; +use crate::event::Instant; pub mod app; mod id; @@ -12,7 +12,5 @@ pub struct Channel { pub id: Id, pub name: String, #[serde(skip)] - pub created_at: DateTime, - #[serde(skip)] - pub created_sequence: Sequence, + pub created: Instant, } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 18cd81f..c000b56 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, Id}, clock::DateTime, - event::{types, Sequence}, + event::{types, Instant, Sequence}, }; pub trait Provider { @@ -19,15 +19,9 @@ impl<'c> Provider for Transaction<'c, Sqlite> { pub struct Channels<'t>(&'t mut SqliteConnection); impl<'c> Channels<'c> { - pub async fn create( - &mut self, - name: &str, - created_at: &DateTime, - created_sequence: Sequence, - ) -> Result<Channel, sqlx::Error> { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result<Channel, sqlx::Error> { let id = Id::generate(); - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" insert into channel (id, name, created_at, created_sequence) @@ -40,9 +34,17 @@ impl<'c> Channels<'c> { "#, id, name, - created_at, - created_sequence, + created.at, + created.sequence, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -50,8 +52,7 @@ impl<'c> Channels<'c> { } pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> { - let channel = sqlx::query_as!( - Channel, + let channel = sqlx::query!( r#" select id as "id: Id", @@ -63,6 +64,14 @@ impl<'c> Channels<'c> { "#, channel, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; @@ -73,8 +82,7 @@ impl<'c> Channels<'c> { &mut self, resume_point: Option<Sequence>, ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -87,6 +95,14 @@ impl<'c> Channels<'c> { "#, resume_point, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -97,8 +113,7 @@ impl<'c> Channels<'c> { &mut self, resume_at: Option<Sequence>, ) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select id as "id: Id", @@ -110,6 +125,14 @@ impl<'c> Channels<'c> { "#, resume_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; @@ -119,8 +142,7 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, channel: &Channel, - deleted_at: &DateTime, - deleted_sequence: Sequence, + deleted: &Instant, ) -> Result<types::ChannelEvent, sqlx::Error> { let channel = channel.id.clone(); sqlx::query_scalar!( @@ -135,15 +157,13 @@ impl<'c> Channels<'c> { .await?; Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, + instant: *deleted, data: types::DeletedEvent { channel }.into(), }) } pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, + let channels = sqlx::query!( r#" select channel.id as "id: Id", @@ -157,6 +177,14 @@ impl<'c> Channels<'c> { "#, expired_at, ) + .map(|row| Channel { + id: row.id, + name: row.name, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 6f844cd..33ec3b7 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -52,7 +52,7 @@ async fn messages_in_order() { let events = events.collect::<Vec<_>>().immediately().await; for ((sent_at, message), event) in requests.into_iter().zip(events) { - assert_eq!(*sent_at, event.at); + assert_eq!(*sent_at, event.instant.at); assert!(matches!( event.data, types::ChannelEventData::Message(event_message) |
