diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/app.rs | 2 | ||||
| -rw-r--r-- | src/channel/app.rs | 18 | ||||
| -rw-r--r-- | src/channel/routes.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes/test/list.rs | 6 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 47 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 2 | ||||
| -rw-r--r-- | src/events/app.rs | 24 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 18 | ||||
| -rw-r--r-- | src/events/types.rs | 40 | ||||
| -rw-r--r-- | src/repo/channel.rs | 30 | ||||
| -rw-r--r-- | src/test/fixtures/channel.rs | 6 |
11 files changed, 146 insertions, 50 deletions
@@ -29,6 +29,6 @@ impl App { } pub const fn channels(&self) -> Channels { - Channels::new(&self.db) + Channels::new(&self.db, &self.broadcaster) } } diff --git a/src/channel/app.rs b/src/channel/app.rs index 6bad158..1eeca79 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,25 +1,33 @@ use sqlx::sqlite::SqlitePool; -use crate::repo::channel::{Channel, Provider as _}; +use crate::{ + clock::DateTime, + events::{broadcaster::Broadcaster, types::ChannelEvent}, + repo::channel::{Channel, Provider as _}, +}; pub struct Channels<'a> { db: &'a SqlitePool, + broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + Self { db, broadcaster } } - pub async fn create(&self, name: &str) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let channel = tx .channels() - .create(name) + .create(name, created_at) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; + self.broadcaster + .broadcast(&ChannelEvent::created(channel.clone())); + Ok(channel) } diff --git a/src/channel/routes.rs b/src/channel/routes.rs index f524e62..1f8db5a 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -52,11 +52,12 @@ struct CreateRequest { async fn on_create( State(app): State<App>, _: Login, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, Json(form): Json<CreateRequest>, ) -> Result<Json<Channel>, CreateError> { let channel = app .channels() - .create(&form.name) + .create(&form.name, &created_at) .await .map_err(CreateError)?; diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs index f7f7b44..bc94024 100644 --- a/src/channel/routes/test/list.rs +++ b/src/channel/routes/test/list.rs @@ -26,7 +26,7 @@ async fn one_channel() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -46,8 +46,8 @@ async fn multiple_channels() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; let channels = vec![ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; // Call the endpoint diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 23885c0..bb6697f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,8 +1,10 @@ use axum::extract::{Json, State}; +use futures::{future, stream::StreamExt as _}; use crate::{ channel::{app, routes}, - test::fixtures, + events::types, + test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] @@ -16,10 +18,14 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = routes::CreateRequest { name }; - let Json(response_channel) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect("new channel in an empty app"); + let Json(response_channel) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect("new channel in an empty app"); // Verify the structure of the response @@ -28,8 +34,23 @@ async fn new_channel() { // Verify the semantics let channels = app.channels().all().await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + + let mut events = app + .events() + .subscribe(&fixtures::now(), types::ResumePoint::default()) + .await + .expect("subscribing never fails") + .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel)); + + let types::ResumableEvent(_, event) = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(types::Sequence::default(), event.sequence); + assert_eq!(types::ChannelEventData::Created, event.data); } #[tokio::test] @@ -38,15 +59,19 @@ async fn duplicate_name() { let app = fixtures::scratch_app().await; let creator = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let request = routes::CreateRequest { name: channel.name }; - let routes::CreateError(error) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect_err("duplicate channel name"); + let routes::CreateError(error) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect_err("duplicate channel name"); // Verify the structure of the response diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 5d87bdc..e4de0f1 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -14,7 +14,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) diff --git a/src/events/app.rs b/src/events/app.rs index 043a29b..134e86a 100644 --- a/src/events/app.rs +++ b/src/events/app.rs @@ -11,7 +11,7 @@ use sqlx::sqlite::SqlitePool; use super::{ broadcaster::Broadcaster, repo::message::Provider as _, - types::{self, ResumePoint}, + types::{self, ChannelEvent, ResumePoint}, }; use crate::{ clock::DateTime, @@ -66,6 +66,17 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; let channels = tx.channels().all().await?; + let created_events = { + let resume_at = resume_at.clone(); + let channels = channels.clone(); + stream::iter( + channels + .into_iter() + .map(ChannelEvent::created) + .filter(move |event| resume_at.not_after(event)), + ) + }; + // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.broadcaster.subscribe(); @@ -104,9 +115,9 @@ impl<'a> Events<'a> { // stored_messages. .filter(Self::resume(resume_live_at)); - Ok(replay - .chain(live_messages) - .scan(resume_at, |resume_point, event| { + Ok(created_events.chain(replay).chain(live_messages).scan( + resume_at, + |resume_point, event| { let channel = &event.channel.id; let sequence = event.sequence; resume_point.advance(channel, sequence); @@ -114,13 +125,14 @@ impl<'a> Events<'a> { let event = types::ResumableEvent(resume_point.clone(), event); future::ready(Some(event)) - })) + }, + )) } fn resume( resume_at: ResumePoint, ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at < event.sequence()) + move |event| future::ready(resume_at.not_after(event)) } fn skip_expired( expire_at: &DateTime, diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index f289225..55ada95 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -15,7 +15,7 @@ async fn includes_historical_message() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; // Call the endpoint @@ -42,7 +42,7 @@ async fn includes_live_message() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -75,8 +75,8 @@ async fn includes_multiple_channels() { let sender = fixtures::login::create(&app).await; let channels = [ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; let messages = stream::iter(channels) @@ -117,7 +117,7 @@ async fn sequential_messages() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let messages = vec![ @@ -156,7 +156,7 @@ async fn resumes_from() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app).await; let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; @@ -229,8 +229,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app).await; - let channel_b = fixtures::channel::create(&app).await; + let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; + let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -346,7 +346,7 @@ async fn removes_expired_messages() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; diff --git a/src/events/types.rs b/src/events/types.rs index 6747afc..7c0e0a4 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -11,6 +11,7 @@ use crate::{ #[derive( Debug, + Default, Eq, Ord, PartialEq, @@ -59,7 +60,30 @@ impl ResumePoint { let Self(elements) = self; elements.get(channel).copied() } + + pub fn not_after(&self, event: impl ResumeElement) -> bool { + let Self(elements) = self; + let (channel, sequence) = event.element(); + + elements + .get(channel) + .map_or(true, |resume_at| resume_at < &sequence) + } } + +pub trait ResumeElement { + fn element(&self) -> (&channel::Id, Sequence); +} + +impl<T> ResumeElement for &T +where + T: ResumeElement, +{ + fn element(&self) -> (&channel::Id, Sequence) { + (*self).element() + } +} + #[derive(Clone, Debug)] pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); @@ -74,14 +98,26 @@ pub struct ChannelEvent { } impl ChannelEvent { - pub fn sequence(&self) -> ResumePoint { - ResumePoint::singleton(&self.channel.id, self.sequence) + pub fn created(channel: Channel) -> Self { + Self { + at: channel.created_at, + sequence: Sequence::default(), + channel, + data: ChannelEventData::Created, + } + } +} + +impl ResumeElement for ChannelEvent { + fn element(&self) -> (&channel::Id, Sequence) { + (&self.channel.id, self.sequence) } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChannelEventData { + Created, Message(MessageEvent), } diff --git a/src/repo/channel.rs b/src/repo/channel.rs index d223dab..e85b898 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -2,7 +2,7 @@ use std::fmt; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::id::Id as BaseId; +use crate::{clock::DateTime, id::Id as BaseId}; pub trait Provider { fn channels(&mut self) -> Channels; @@ -20,22 +20,32 @@ pub struct Channels<'t>(&'t mut SqliteConnection); pub struct Channel { pub id: Id, pub name: String, + #[serde(skip)] + pub created_at: DateTime, } impl<'c> Channels<'c> { - pub async fn create(&mut self, name: &str) -> Result<Channel, sqlx::Error> { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + ) -> Result<Channel, sqlx::Error> { let id = Id::generate(); let channel = sqlx::query_as!( Channel, r#" insert - into channel (id, name) - values ($1, $2) - returning id as "id: Id", name + into channel (id, name, created_at) + values ($1, $2, $3) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime" "#, id, name, + created_at, ) .fetch_one(&mut *self.0) .await?; @@ -47,7 +57,10 @@ impl<'c> Channels<'c> { let channel = sqlx::query_as!( Channel, r#" - select id as "id: Id", name + select + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel where id = $1 "#, @@ -64,8 +77,9 @@ impl<'c> Channels<'c> { Channel, r#" select - channel.id as "id: Id", - channel.name + id as "id: Id", + name, + created_at as "created_at: DateTime" from channel order by channel.name "#, diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 0558395..8744470 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -4,12 +4,12 @@ use faker_rand::{ }; use rand; -use crate::{app::App, repo::channel::Channel}; +use crate::{app::App, clock::RequestedAt, repo::channel::Channel}; -pub async fn create(app: &App) -> Channel { +pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { let name = propose(); app.channels() - .create(&name) + .create(&name, created_at) .await .expect("should always succeed if the channel is actually new") } |
