diff options
Diffstat (limited to 'src/events')
| -rw-r--r-- | src/events/app.rs | 163 | ||||
| -rw-r--r-- | src/events/broadcaster.rs | 3 | ||||
| -rw-r--r-- | src/events/extract.rs | 85 | ||||
| -rw-r--r-- | src/events/mod.rs | 8 | ||||
| -rw-r--r-- | src/events/repo/message.rs | 198 | ||||
| -rw-r--r-- | src/events/repo/mod.rs | 1 | ||||
| -rw-r--r-- | src/events/routes.rs | 69 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 425 | ||||
| -rw-r--r-- | src/events/types.rs | 170 |
9 files changed, 0 insertions, 1122 deletions
diff --git a/src/events/app.rs b/src/events/app.rs deleted file mode 100644 index db7f430..0000000 --- a/src/events/app.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::collections::BTreeMap; - -use chrono::TimeDelta; -use futures::{ - future, - stream::{self, StreamExt as _}, - Stream, -}; -use sqlx::sqlite::SqlitePool; - -use super::{ - broadcaster::Broadcaster, - repo::message::Provider as _, - types::{self, ChannelEvent, ResumePoint}, -}; -use crate::{ - clock::DateTime, - repo::{ - channel::{self, Provider as _}, - error::NotFound as _, - login::Login, - }, -}; - -pub struct Events<'a> { - db: &'a SqlitePool, - events: &'a Broadcaster, -} - -impl<'a> Events<'a> { - pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { - Self { db, events } - } - - pub async fn send( - &self, - login: &Login, - channel: &channel::Id, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, EventsError> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let event = tx - .message_events() - .create(login, &channel, body, sent_at) - .await?; - tx.commit().await?; - - self.events.broadcast(&event); - Ok(event) - } - - pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 90 days. - let expire_at = relative_to.to_owned() - TimeDelta::days(90); - - let mut tx = self.db.begin().await?; - let expired = tx.message_events().expired(&expire_at).await?; - - let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { - let sequence = tx.message_events().assign_sequence(&channel).await?; - let event = tx - .message_events() - .delete_expired(&channel, &message, sequence, relative_to) - .await?; - events.push(event); - } - - tx.commit().await?; - - for event in events { - self.events.broadcast(&event); - } - - Ok(()) - } - - pub async fn subscribe( - &self, - resume_at: ResumePoint, - ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all().await?; - - let created_events = { - let resume_at = resume_at.clone(); - let channels = channels.clone(); - stream::iter( - channels - .into_iter() - .map(ChannelEvent::created) - .filter(move |event| resume_at.not_after(event)), - ) - }; - - // Subscribe before retrieving, to catch messages broadcast while we're - // querying the DB. We'll prune out duplicates later. - let live_messages = self.events.subscribe(); - - let mut replays = BTreeMap::new(); - let mut resume_live_at = resume_at.clone(); - for channel in channels { - let replay = tx - .message_events() - .replay(&channel, resume_at.get(&channel.id)) - .await?; - - if let Some(last) = replay.last() { - resume_live_at.advance(last); - } - - replays.insert(channel.id.clone(), replay); - } - - let replay = stream::select_all(replays.into_values().map(stream::iter)); - - // no skip_expired or resume transforms for stored_messages, as it's - // constructed not to contain messages meeting either criterion. - // - // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; - // * resume is redundant with the resume_at argument to - // `tx.broadcasts().replay(…)`. - let live_messages = live_messages - // Filtering on the broadcast resume point filters out messages - // before resume_at, and filters out messages duplicated from - // stored_messages. - .filter(Self::resume(resume_live_at)); - - Ok(created_events.chain(replay).chain(live_messages).scan( - resume_at, - |resume_point, event| { - match event.data { - types::ChannelEventData::Deleted(_) => resume_point.forget(&event), - _ => resume_point.advance(&event), - } - - let event = types::ResumableEvent(resume_point.clone(), event); - - future::ready(Some(event)) - }, - )) - } - - fn resume( - resume_at: ResumePoint, - ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> { - move |event| future::ready(resume_at.not_after(event)) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum EventsError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error(transparent)] - DatabaseError(#[from] sqlx::Error), -} diff --git a/src/events/broadcaster.rs b/src/events/broadcaster.rs deleted file mode 100644 index 6b664cb..0000000 --- a/src/events/broadcaster.rs +++ /dev/null @@ -1,3 +0,0 @@ -use crate::{broadcast, events::types}; - -pub type Broadcaster = broadcast::Broadcaster<types::ChannelEvent>; diff --git a/src/events/extract.rs b/src/events/extract.rs deleted file mode 100644 index e3021e2..0000000 --- a/src/events/extract.rs +++ /dev/null @@ -1,85 +0,0 @@ -use std::ops::Deref; - -use axum::{ - extract::FromRequestParts, - http::{request::Parts, HeaderName, HeaderValue}, -}; -use axum_extra::typed_header::TypedHeader; -use serde::{de::DeserializeOwned, Serialize}; - -// A typed header. When used as a bare extractor, reads from the -// `Last-Event-Id` HTTP header. -pub struct LastEventId<T>(pub T); - -static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); - -impl<T> headers::Header for LastEventId<T> -where - T: Serialize + DeserializeOwned, -{ - fn name() -> &'static HeaderName { - &LAST_EVENT_ID - } - - fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error> - where - I: Iterator<Item = &'i HeaderValue>, - { - let value = values.next().ok_or_else(headers::Error::invalid)?; - let value = value.to_str().map_err(|_| headers::Error::invalid())?; - let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?; - Ok(Self(value)) - } - - fn encode<E>(&self, values: &mut E) - where - E: Extend<HeaderValue>, - { - let Self(value) = self; - // Must panic or suppress; the trait provides no other options. - let value = serde_json::to_string(value).expect("value can be encoded as JSON"); - let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value"); - - values.extend(std::iter::once(value)); - } -} - -#[async_trait::async_trait] -impl<S, T> FromRequestParts<S> for LastEventId<T> -where - S: Send + Sync, - T: Serialize + DeserializeOwned, -{ - type Rejection = <TypedHeader<Self> as FromRequestParts<S>>::Rejection; - - async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> { - // This is purely for ergonomics: it allows `RequestedAt` to be extracted - // without having to wrap it in `Extension<>`. Callers _can_ still do that, - // but they aren't forced to. - let TypedHeader(requested_at) = TypedHeader::from_request_parts(parts, state).await?; - - Ok(requested_at) - } -} - -impl<T> Deref for LastEventId<T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - let Self(header) = self; - header - } -} - -impl<T> From<T> for LastEventId<T> { - fn from(value: T) -> Self { - Self(value) - } -} - -impl<T> LastEventId<T> { - pub fn into_inner(self) -> T { - let Self(value) = self; - value - } -} diff --git a/src/events/mod.rs b/src/events/mod.rs deleted file mode 100644 index 711ae64..0000000 --- a/src/events/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub mod app; -pub mod broadcaster; -mod extract; -pub mod repo; -mod routes; -pub mod types; - -pub use self::routes::router; diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs deleted file mode 100644 index f8bae2b..0000000 --- a/src/events/repo/message.rs +++ /dev/null @@ -1,198 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - clock::DateTime, - events::types::{self, Sequence}, - repo::{ - channel::{self, Channel}, - login::{self, Login}, - message::{self, Message}, - }, -}; - -pub trait Provider { - fn message_events(&mut self) -> Events; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn message_events(&mut self) -> Events { - Events(self) - } -} - -pub struct Events<'t>(&'t mut SqliteConnection); - -impl<'c> Events<'c> { - pub async fn create( - &mut self, - sender: &Login, - channel: &Channel, - body: &str, - sent_at: &DateTime, - ) -> Result<types::ChannelEvent, sqlx::Error> { - let sequence = self.assign_sequence(channel).await?; - - let id = message::Id::generate(); - - let message = sqlx::query!( - r#" - insert into message - (id, channel, sequence, sender, body, sent_at) - values ($1, $2, $3, $4, $5, $6) - returning - id as "id: message::Id", - sequence as "sequence: Sequence", - sender as "sender: login::Id", - body, - sent_at as "sent_at: DateTime" - "#, - id, - channel.id, - sequence, - sender.id, - body, - sent_at, - ) - .map(|row| types::ChannelEvent { - sequence: row.sequence, - at: row.sent_at, - data: types::MessageEvent { - channel: channel.clone(), - sender: sender.clone(), - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { - let next = sqlx::query_scalar!( - r#" - update channel - set last_sequence = last_sequence + 1 - where id = $1 - returning last_sequence as "next_sequence: Sequence" - "#, - channel.id, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } - - pub async fn delete_expired( - &mut self, - channel: &Channel, - message: &message::Id, - sequence: Sequence, - deleted_at: &DateTime, - ) -> Result<types::ChannelEvent, sqlx::Error> { - sqlx::query_scalar!( - r#" - delete from message - where id = $1 - returning 1 as "row: i64" - "#, - message, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - sequence, - at: *deleted_at, - data: types::MessageDeletedEvent { - channel: channel.clone(), - message: message.clone(), - } - .into(), - }) - } - - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - channel.created_at as "channel_created_at: DateTime", - message.id as "message: message::Id" - from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where sent_at < $1 - "#, - expire_at, - ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - created_at: row.channel_created_at, - }, - row.message, - ) - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } - - pub async fn replay( - &mut self, - channel: &Channel, - resume_at: Option<Sequence>, - ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> { - let events = sqlx::query!( - r#" - select - message.id as "id: message::Id", - sequence as "sequence: Sequence", - login.id as "sender_id: login::Id", - login.name as sender_name, - message.body, - message.sent_at as "sent_at: DateTime" - from message - join login on message.sender = login.id - where channel = $1 - and coalesce(sequence > $2, true) - order by sequence asc - "#, - channel.id, - resume_at, - ) - .map(|row| types::ChannelEvent { - sequence: row.sequence, - at: row.sent_at, - data: types::MessageEvent { - channel: channel.clone(), - sender: login::Login { - id: row.sender_id, - name: row.sender_name, - }, - message: Message { - id: row.id, - body: row.body, - }, - } - .into(), - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(events) - } -} diff --git a/src/events/repo/mod.rs b/src/events/repo/mod.rs deleted file mode 100644 index e216a50..0000000 --- a/src/events/repo/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod message; diff --git a/src/events/routes.rs b/src/events/routes.rs deleted file mode 100644 index ec9dae2..0000000 --- a/src/events/routes.rs +++ /dev/null @@ -1,69 +0,0 @@ -use axum::{ - extract::State, - response::{ - sse::{self, Sse}, - IntoResponse, Response, - }, - routing::get, - Router, -}; -use futures::stream::{Stream, StreamExt as _}; - -use super::{ - extract::LastEventId, - types::{self, ResumePoint}, -}; -use crate::{app::App, error::Internal, login::extract::Identity}; - -#[cfg(test)] -mod test; - -pub fn router() -> Router<App> { - Router::new().route("/api/events", get(events)) -} - -async fn events( - State(app): State<App>, - identity: Identity, - last_event_id: Option<LastEventId<ResumePoint>>, -) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .unwrap_or_default(); - - let stream = app.events().subscribe(resume_at).await?; - let stream = app.logins().limit_stream(identity.token, stream); - - Ok(Events(stream)) -} - -#[derive(Debug)] -struct Events<S>(S); - -impl<S> IntoResponse for Events<S> -where - S: Stream<Item = types::ResumableEvent> + Send + 'static, -{ - fn into_response(self) -> Response { - let Self(stream) = self; - let stream = stream.map(sse::Event::try_from); - Sse::new(stream) - .keep_alive(sse::KeepAlive::default()) - .into_response() - } -} - -impl TryFrom<types::ResumableEvent> for sse::Event { - type Error = serde_json::Error; - - fn try_from(value: types::ResumableEvent) -> Result<Self, Self::Error> { - let types::ResumableEvent(resume_at, data) = value; - - let id = serde_json::to_string(&resume_at)?; - let data = serde_json::to_string_pretty(&data)?; - - let event = Self::default().id(id).data(data); - - Ok(event) - } -} diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs deleted file mode 100644 index 820192d..0000000 --- a/src/events/routes/test.rs +++ /dev/null @@ -1,425 +0,0 @@ -use axum::extract::State; -use futures::{ - future, - stream::{self, StreamExt as _}, -}; - -use crate::{ - events::{routes, types}, - test::fixtures::{self, future::Immediately as _}, -}; - -#[tokio::test] -async fn includes_historical_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let types::ResumableEvent(_, event) = events - .filter(fixtures::filter::messages()) - .next() - .immediately() - .await - .expect("delivered stored message"); - - assert_eq!(message, event); -} - -#[tokio::test] -async fn includes_live_message() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the semantics - - let sender = fixtures::login::create(&app).await; - let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - let types::ResumableEvent(_, event) = events - .filter(fixtures::filter::messages()) - .next() - .immediately() - .await - .expect("delivered live message"); - - assert_eq!(message, event); -} - -#[tokio::test] -async fn includes_multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await } - }) - .collect::<Vec<_>>() - .await; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = events - .filter(fixtures::filter::messages()) - .take(messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &messages { - assert!(events - .iter() - .any(|types::ResumableEvent(_, event)| { event == message })); - } -} - -#[tokio::test] -async fn sequential_messages() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; - - let messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - let routes::Events(events) = routes::events(State(app), subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let mut events = - events.filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))); - - // Verify delivery in order - for message in &messages { - let types::ResumableEvent(_, event) = events - .next() - .immediately() - .await - .expect("undelivered messages remaining"); - - assert_eq!(message, &event); - } -} - -#[tokio::test] -async fn resumes_from() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; - - let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - - let later_messages = vec![ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - ]; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - - let resume_at = { - // First subscription - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); - - let types::ResumableEvent(last_event_id, event) = events - .filter(fixtures::filter::messages()) - .next() - .immediately() - .await - .expect("delivered events"); - - assert_eq!(initial_message, event); - - last_event_id - }; - - // Resume after disconnect - let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into())) - .await - .expect("subscribe never fails"); - - // Verify the structure of the response. - - let events = resumed - .take(later_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &later_messages { - assert!(events - .iter() - .any(|types::ResumableEvent(_, event)| event == message)); - } -} - -// This test verifies a real bug I hit developing the vector-of-sequences -// approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. -// -// Clients would see something like -// * In the original stream, Cfoo=5,Cbar=8 -// * In the resumed stream, Cfoo=6 (no Cbar sequence number) -// -// Disconnecting and reconnecting a second time, using event IDs from that -// initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. -// -// This is a real and valid behaviour for clients! -#[tokio::test] -async fn serial_resume() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; - - let resume_at = { - let initial_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, - ]; - - // First subscription - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); - - let events = events - .filter(fixtures::filter::messages()) - .take(initial_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &initial_messages { - assert!(events - .iter() - .any(|types::ResumableEvent(_, event)| event == message)); - } - - let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); - - id.to_owned() - }; - - // Resume after disconnect - let resume_at = { - let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message - // into the resumed event stream. - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - ]; - - // Second subscription - let routes::Events(events) = routes::events( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter(fixtures::filter::messages()) - .take(resume_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - for message in &resume_messages { - assert!(events - .iter() - .any(|types::ResumableEvent(_, event)| event == message)); - } - - let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty"); - - id.to_owned() - }; - - // Resume after disconnect a second time - { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. - let final_messages = [ - fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, - ]; - - // Third subscription - let routes::Events(events) = routes::events( - State(app.clone()), - subscriber.clone(), - Some(resume_at.into()), - ) - .await - .expect("subscribe never fails"); - - let events = events - .filter(fixtures::filter::messages()) - .take(final_messages.len()) - .collect::<Vec<_>>() - .immediately() - .await; - - // This set of messages, in particular, _should not_ include any prior - // messages from `initial_messages` or `resume_messages`. - for message in &final_messages { - assert!(events - .iter() - .any(|types::ResumableEvent(_, event)| event == message)); - } - }; -} - -#[tokio::test] -async fn terminates_on_token_expiry() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber = - fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; - - let routes::Events(events) = routes::events(State(app.clone()), subscriber, None) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.logins() - .expire(&fixtures::now()) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - ]; - - assert!(events - .filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))) - .next() - .immediately() - .await - .is_none()); -} - -#[tokio::test] -async fn terminates_on_logout() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; - - // Subscribe via the endpoint - - let subscriber_creds = fixtures::login::create_with_password(&app).await; - let subscriber_token = - fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; - let subscriber = - fixtures::identity::from_token(&app, &subscriber_token, &fixtures::now()).await; - - let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None) - .await - .expect("subscribe never fails"); - - // Verify the resulting stream's behaviour - - app.logins() - .logout(&subscriber.token) - .await - .expect("expiring tokens succeeds"); - - // These should not be delivered. - let messages = [ - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await, - ]; - - assert!(events - .filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event))) - .next() - .immediately() - .await - .is_none()); -} diff --git a/src/events/types.rs b/src/events/types.rs deleted file mode 100644 index d954512..0000000 --- a/src/events/types.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::collections::BTreeMap; - -use crate::{ - clock::DateTime, - repo::{ - channel::{self, Channel}, - login::Login, - message, - }, -}; - -#[derive( - Debug, - Default, - Eq, - Ord, - PartialEq, - PartialOrd, - Clone, - Copy, - serde::Serialize, - serde::Deserialize, - sqlx::Type, -)] -#[serde(transparent)] -#[sqlx(transparent)] -pub struct Sequence(i64); - -impl Sequence { - pub fn next(self) -> Self { - let Self(current) = self; - Self(current + 1) - } -} - -// For the purposes of event replay, a resume point is a vector of resume -// elements. A resume element associates a channel (by ID) with the latest event -// seen in that channel so far. Replaying the event stream can restart at a -// predictable point - hence the name. These values can be serialized and sent -// to the client as JSON dicts, then rehydrated to recover the resume point at a -// later time. -// -// Using a sorted map ensures that there is a canonical representation for -// each resume point. -#[derive(Clone, Debug, Default, PartialEq, PartialOrd, serde::Deserialize, serde::Serialize)] -#[serde(transparent)] -pub struct ResumePoint(BTreeMap<channel::Id, Sequence>); - -impl ResumePoint { - pub fn advance<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { - let Self(elements) = self; - let ResumeElement(channel, sequence) = event.into(); - elements.insert(channel.clone(), sequence); - } - - pub fn forget<'e>(&mut self, event: impl Into<ResumeElement<'e>>) { - let Self(elements) = self; - let ResumeElement(channel, _) = event.into(); - elements.remove(channel); - } - - pub fn get(&self, channel: &channel::Id) -> Option<Sequence> { - let Self(elements) = self; - elements.get(channel).copied() - } - - pub fn not_after<'e>(&self, event: impl Into<ResumeElement<'e>>) -> bool { - let Self(elements) = self; - let ResumeElement(channel, sequence) = event.into(); - - elements - .get(channel) - .map_or(true, |resume_at| resume_at < &sequence) - } -} - -pub struct ResumeElement<'i>(&'i channel::Id, Sequence); - -#[derive(Clone, Debug)] -pub struct ResumableEvent(pub ResumePoint, pub ChannelEvent); - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct ChannelEvent { - #[serde(skip)] - pub sequence: Sequence, - pub at: DateTime, - #[serde(flatten)] - pub data: ChannelEventData, -} - -impl ChannelEvent { - pub fn created(channel: Channel) -> Self { - Self { - at: channel.created_at, - sequence: Sequence::default(), - data: CreatedEvent { channel }.into(), - } - } - - pub fn channel_id(&self) -> &channel::Id { - match &self.data { - ChannelEventData::Created(event) => &event.channel.id, - ChannelEventData::Message(event) => &event.channel.id, - ChannelEventData::MessageDeleted(event) => &event.channel.id, - ChannelEventData::Deleted(event) => &event.channel, - } - } -} - -impl<'c> From<&'c ChannelEvent> for ResumeElement<'c> { - fn from(event: &'c ChannelEvent) -> Self { - Self(event.channel_id(), event.sequence) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ChannelEventData { - Created(CreatedEvent), - Message(MessageEvent), - MessageDeleted(MessageDeletedEvent), - Deleted(DeletedEvent), -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct CreatedEvent { - pub channel: Channel, -} - -impl From<CreatedEvent> for ChannelEventData { - fn from(event: CreatedEvent) -> Self { - Self::Created(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageEvent { - pub channel: Channel, - pub sender: Login, - pub message: message::Message, -} - -impl From<MessageEvent> for ChannelEventData { - fn from(event: MessageEvent) -> Self { - Self::Message(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct MessageDeletedEvent { - pub channel: Channel, - pub message: message::Id, -} - -impl From<MessageDeletedEvent> for ChannelEventData { - fn from(event: MessageDeletedEvent) -> Self { - Self::MessageDeleted(event) - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct DeletedEvent { - pub channel: channel::Id, -} - -impl From<DeletedEvent> for ChannelEventData { - fn from(event: DeletedEvent) -> Self { - Self::Deleted(event) - } -} |
