From ba96974bdebd6d4ec345907d49944b5ee644ed47 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 9 Oct 2024 00:57:31 -0400 Subject: Provide a view of logins to clients. --- src/event/app.rs | 12 +++++++++++- src/event/mod.rs | 13 +++++++++++-- src/event/routes/test.rs | 32 ++++++++++++++++---------------- 3 files changed, 38 insertions(+), 19 deletions(-) (limited to 'src/event') diff --git a/src/event/app.rs b/src/event/app.rs index 141037d..951ce25 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -9,6 +9,7 @@ use sqlx::sqlite::SqlitePool; use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, + login::{self, repo::Provider as _}, message::{self, repo::Provider as _}, }; @@ -33,6 +34,14 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; + let logins = tx.logins().replay(resume_at).await?; + let login_events = logins + .iter() + .map(login::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + let channels = tx.channels().replay(resume_at).await?; let channel_events = channels .iter() @@ -49,7 +58,8 @@ impl<'a> Events<'a> { .filter(Sequence::after(resume_at)) .map(Event::from); - let replay_events = channel_events + let replay_events = login_events + .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index 698e55a..69c7a10 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,13 +1,14 @@ -use crate::{channel, message}; +use crate::{channel, login, message}; pub mod app; -pub mod broadcaster; +mod broadcaster; mod extract; pub mod repo; mod routes; mod sequence; pub use self::{ + broadcaster::Broadcaster, routes::router, sequence::{Instant, Sequence, Sequenced}, }; @@ -17,6 +18,7 @@ pub type ResumePoint = Option; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { + Login(login::Event), Channel(channel::Event), Message(message::Event), } @@ -24,12 +26,19 @@ pub enum Event { impl Sequenced for Event { fn instant(&self) -> Instant { match self { + Self::Login(event) => event.instant(), Self::Channel(event) => event.instant(), Self::Message(event) => event.instant(), } } } +impl From for Event { + fn from(event: login::Event) -> Self { + Self::Login(event) + } +} + impl From for Event { fn from(event: channel::Event) -> Self { Self::Channel(event) diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index ba9953e..209a016 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -15,13 +15,13 @@ async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -48,7 +48,7 @@ async fn includes_live_message() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app.clone()), subscriber, None, Query::default()) @@ -57,7 +57,7 @@ async fn includes_live_message() { // Verify the semantics - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events @@ -75,7 +75,7 @@ async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -94,7 +94,7 @@ async fn includes_multiple_channels() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -122,7 +122,7 @@ async fn sequential_messages() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -132,7 +132,7 @@ async fn sequential_messages() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -166,7 +166,7 @@ async fn resumes_from() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -177,7 +177,7 @@ async fn resumes_from() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { @@ -248,13 +248,13 @@ async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { @@ -372,11 +372,11 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; @@ -417,11 +417,11 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber_token = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; let subscriber = -- cgit v1.2.3