diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-10-10 13:26:15 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-10-10 13:26:15 -0400 |
| commit | 03f8d9ad603a4e523a0e2a0e60ad62c8725f0875 (patch) | |
| tree | b01543c0c2dadbd4be17320d47fc2e3d2fdb280d /src | |
| parent | efae871b1bdb1e01081a44218281950cf0177f3b (diff) | |
| parent | d173bc08f2b699f58c8cca752ff688ad46f33ced (diff) | |
Merge branch 'main' into wip/path-routing-for-channels
Diffstat (limited to 'src')
40 files changed, 610 insertions, 442 deletions
@@ -3,9 +3,9 @@ use sqlx::sqlite::SqlitePool; use crate::{ boot::app::Boot, channel::app::Channels, - event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, + event::{self, app::Events}, message::app::Messages, - token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, + token::{self, app::Tokens}, }; #[cfg(test)] @@ -14,15 +14,19 @@ use crate::login::app::Logins; #[derive(Clone)] pub struct App { db: SqlitePool, - events: EventBroadcaster, - tokens: TokenBroadcaster, + events: event::Broadcaster, + token_events: token::Broadcaster, } impl App { pub fn from(db: SqlitePool) -> Self { - let events = EventBroadcaster::default(); - let tokens = TokenBroadcaster::default(); - Self { db, events, tokens } + let events = event::Broadcaster::default(); + let token_events = token::Broadcaster::default(); + Self { + db, + events, + token_events, + } } } @@ -41,7 +45,7 @@ impl App { #[cfg(test)] pub const fn logins(&self) -> Logins { - Logins::new(&self.db) + Logins::new(&self.db, &self.events) } pub const fn messages(&self) -> Messages { @@ -49,6 +53,6 @@ impl App { } pub const fn tokens(&self) -> Tokens { - Tokens::new(&self.db, &self.tokens) + Tokens::new(&self.db, &self.events, &self.token_events) } } diff --git a/src/boot/app.rs b/src/boot/app.rs index fc84b3a..ef48b2f 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,8 +1,9 @@ use sqlx::sqlite::SqlitePool; -use super::{Channel, Snapshot}; +use super::Snapshot; use crate::{ - channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _, + channel::repo::Provider as _, event::repo::Provider as _, login::repo::Provider as _, + message::repo::Provider as _, }; pub struct Boot<'a> { @@ -17,38 +18,33 @@ impl<'a> Boot<'a> { pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let channels = tx.channels().all(resume_point.into()).await?; - - let channels = { - let mut snapshots = Vec::with_capacity(channels.len()); - - let channels = channels.into_iter().filter_map(|channel| { - channel - .as_of(resume_point) - .map(|snapshot| (channel, snapshot)) - }); - for (channel, snapshot) in channels { - let messages = tx - .messages() - .in_channel(&channel, resume_point.into()) - .await?; + let logins = tx.logins().all(resume_point.into()).await?; + let channels = tx.channels().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point.into()).await?; - let messages = messages - .into_iter() - .filter_map(|message| message.as_of(resume_point)); + tx.commit().await?; - snapshots.push(Channel::new(snapshot, messages)); - } + let logins = logins + .into_iter() + .filter_map(|login| login.as_of(resume_point)) + .collect(); - snapshots - }; + let channels = channels + .into_iter() + .filter_map(|channel| channel.as_of(resume_point)) + .collect(); - tx.commit().await?; + let messages = messages + .into_iter() + .filter_map(|message| message.as_of(resume_point)) + .collect(); Ok(Snapshot { resume_point, + logins, channels, + messages, }) } } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index bd0da0a..ed4764a 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,74 +1,14 @@ pub mod app; mod routes; -use crate::{ - channel, - event::{Instant, Sequence}, - login::Login, - message, -}; +use crate::{channel::Channel, event::Sequence, login::Login, message::Message}; pub use self::routes::router; #[derive(serde::Serialize)] pub struct Snapshot { pub resume_point: Sequence, + pub logins: Vec<Login>, pub channels: Vec<Channel>, -} - -#[derive(serde::Serialize)] -pub struct Channel { - pub id: channel::Id, - pub name: String, pub messages: Vec<Message>, } - -impl Channel { - fn new( - channel: channel::Channel, - messages: impl IntoIterator<Item = message::Message>, - ) -> Self { - // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types. - let channel::Channel { id, name } = channel; - - Self { - id, - name, - messages: messages.into_iter().map(Message::from).collect(), - } - } -} - -#[derive(serde::Serialize)] -pub struct Message { - #[serde(flatten)] - pub sent: Instant, - pub sender: Login, - // Named this way for serialization reasons - #[allow(clippy::struct_field_names)] - pub message: Body, -} - -impl From<message::Message> for Message { - fn from(message: message::Message) -> Self { - let message::Message { - sent, - channel: _, - sender, - id, - body, - } = message; - - Self { - sent, - sender, - message: Body { id, body }, - } - } -} - -#[derive(serde::Serialize)] -pub struct Body { - id: message::Id, - body: String, -} diff --git a/src/channel/app.rs b/src/channel/app.rs index a9a9e84..cb7ad32 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -6,7 +6,7 @@ use super::{repo::Provider as _, Channel, Id}; use crate::{ clock::DateTime, db::NotFound, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{repo::Provider as _, Broadcaster, Event, Sequence}, message::repo::Provider as _, }; diff --git a/src/channel/event.rs b/src/channel/event.rs index 9c54174..f3dca3e 100644 --- a/src/channel/event.rs +++ b/src/channel/event.rs @@ -5,32 +5,30 @@ use crate::{ }; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Event { - #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] - pub kind: Kind, +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { + Created(Created), + Deleted(Deleted), } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant + match self { + Self::Created(event) => event.instant, + Self::Deleted(event) => event.instant, + } } } #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Kind { - Created(Created), - Deleted(Deleted), -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Created { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] pub channel: Channel, } -impl From<Created> for Kind { +impl From<Created> for Event { fn from(event: Created) -> Self { Self::Created(event) } @@ -38,10 +36,12 @@ impl From<Created> for Kind { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Deleted { - pub channel: channel::Id, + #[serde(flatten)] + pub instant: Instant, + pub id: channel::Id, } -impl From<Deleted> for Kind { +impl From<Deleted> for Event { fn from(event: Deleted) -> Self { Self::Deleted(event) } diff --git a/src/channel/history.rs b/src/channel/history.rs index 383fb7b..78b3437 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -40,22 +40,20 @@ impl History { } fn created(&self) -> Event { - Event { + Created { instant: self.created, - kind: Created { - channel: self.channel.clone(), - } - .into(), + channel: self.channel.clone(), } + .into() } fn deleted(&self) -> Option<Event> { - self.deleted.map(|instant| Event { - instant, - kind: Deleted { - channel: self.channel.id.clone(), + self.deleted.map(|instant| { + Deleted { + instant, + id: self.channel.id.clone(), } - .into(), + .into() }) } } diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5d67af8..e97c447 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,7 +7,13 @@ use axum::{ }; use super::{app, Channel, Id}; -use crate::{app::App, clock::RequestedAt, error::Internal, login::Login, message::app::SendError}; +use crate::{ + app::App, + clock::RequestedAt, + error::{Internal, NotFound}, + login::Login, + message::app::SendError, +}; #[cfg(test)] mod test; @@ -56,7 +62,7 @@ impl IntoResponse for CreateError { #[derive(Clone, serde::Deserialize)] struct SendRequest { - message: String, + body: String, } async fn on_send( @@ -67,7 +73,7 @@ async fn on_send( Json(request): Json<SendRequest>, ) -> Result<StatusCode, SendErrorResponse> { app.messages() - .send(&channel, &login, &sent_at, &request.message) + .send(&channel, &login, &sent_at, &request.body) .await?; Ok(StatusCode::ACCEPTED) @@ -81,9 +87,7 @@ impl IntoResponse for SendErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ SendError::ChannelNotFound(_) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() - } + not_found @ SendError::ChannelNotFound(_) => NotFound(not_found).into_response(), other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index ed49017..eeecc7f 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -2,7 +2,7 @@ use axum::extract::{Json, State}; use futures::stream::StreamExt as _; use crate::{ - channel::{app, routes}, + channel::{self, app, routes}, event, test::fixtures::{self, future::Immediately as _}, }; @@ -12,7 +12,7 @@ async fn new_channel() { // Set up the environment let app = fixtures::scratch_app().await; - let creator = fixtures::login::create(&app).await; + let creator = fixtures::login::create(&app, &fixtures::now()).await; // Call the endpoint @@ -53,8 +53,8 @@ async fn new_channel() { .expect("creation event published"); assert!(matches!( - event.kind, - event::Kind::ChannelCreated(event) + event, + event::Event::Channel(channel::Event::Created(event)) if event.channel == response_channel )); } @@ -64,7 +64,7 @@ async fn duplicate_name() { // Set up the environment let app = fixtures::scratch_app().await; - let creator = fixtures::login::create(&app).await; + let creator = fixtures::login::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 3297093..293cc56 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -4,8 +4,8 @@ use futures::stream::StreamExt; use crate::{ channel, channel::routes, - event, - message::app::SendError, + event::{self, Sequenced}, + message::{self, app::SendError}, test::fixtures::{self, future::Immediately as _}, }; @@ -14,7 +14,7 @@ async fn messages_in_order() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) @@ -24,10 +24,8 @@ async fn messages_in_order() { (fixtures::now(), fixtures::message::propose()), ]; - for (sent_at, message) in &requests { - let request = routes::SendRequest { - message: message.clone(), - }; + for (sent_at, body) in &requests { + let request = routes::SendRequest { body: body.clone() }; routes::on_send( State(app.clone()), @@ -53,11 +51,11 @@ async fn messages_in_order() { let events = events.collect::<Vec<_>>().immediately().await; for ((sent_at, message), event) in requests.into_iter().zip(events) { - assert_eq!(*sent_at, event.instant.at); + assert_eq!(*sent_at, event.at()); assert!(matches!( - event.kind, - event::Kind::MessageSent(event) - if event.message.sender == sender + event, + event::Event::Message(message::Event::Sent(event)) + if event.message.sender == sender.id && event.message.body == message )); } @@ -68,14 +66,14 @@ async fn nonexistent_channel() { // Set up the environment let app = fixtures::scratch_app().await; - let login = fixtures::login::create(&app).await; + let login = fixtures::login::create(&app, &fixtures::now()).await; // Call the endpoint let sent_at = fixtures::now(); let channel = channel::Id::generate(); let request = routes::SendRequest { - message: fixtures::message::propose(), + body: fixtures::message::propose(), }; let routes::SendErrorResponse(error) = routes::on_send( State(app), diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs index 6462f25..d4d1d27 100644 --- a/src/channel/snapshot.rs +++ b/src/channel/snapshot.rs @@ -1,5 +1,5 @@ use super::{ - event::{Created, Event, Kind}, + event::{Created, Event}, Id, }; @@ -11,9 +11,9 @@ pub struct Channel { impl Channel { fn apply(state: Option<Self>, event: Event) -> Option<Self> { - match (state, event.kind) { - (None, Kind::Created(event)) => Some(event.into()), - (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None, + match (state, event) { + (None, Event::Created(event)) => Some(event.into()), + (Some(channel), Event::Deleted(event)) if channel.id == event.id => None, (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), } } diff --git a/src/db/mod.rs b/src/db/mod.rs index bbaec7d..b9c59ef 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -28,6 +28,8 @@ pub async fn prepare(url: &str, backup_url: &str) -> Result<SqlitePool, Error> { if let Err(migrate_error) = sqlx::migrate!().run(&pool).await { if let Err(restore_error) = backup::Backup::from(&backup_pool).to(&pool).backup().await { Err(Error::Restore(restore_error, migrate_error))?; + } else if let Err(drop_error) = Sqlite::drop_database(backup_url).await { + Err(Error::Drop(drop_error, migrate_error))?; } else { Err(migrate_error)?; }; @@ -77,8 +79,12 @@ pub enum Error { /// Failure due to a database backup error. See [`backup::Error`]. #[error(transparent)] Backup(#[from] backup::Error), - #[error("backing out failed migration also failed: {0} ({1})")] + #[error("migration failed: {1}\nrestoring backup failed: {0}")] Restore(backup::Error, sqlx::migrate::MigrateError), + #[error( + "migration failed: {1}\nrestoring from backup succeeded, but deleting backup failed: {0}" + )] + Drop(sqlx::Error, sqlx::migrate::MigrateError), /// Failure due to a database migration error. See /// [`sqlx::migrate::MigrateError`]. #[error(transparent)] diff --git a/src/error.rs b/src/error.rs index 8792a1d..85573d4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -69,3 +69,15 @@ impl IntoResponse for Unauthorized { (StatusCode::UNAUTHORIZED, "unauthorized").into_response() } } + +pub struct NotFound<E>(pub E); + +impl<E> IntoResponse for NotFound<E> +where + E: std::error::Error, +{ + fn into_response(self) -> Response { + let Self(response) = self; + (StatusCode::NOT_FOUND, response.to_string()).into_response() + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 141037d..951ce25 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -9,6 +9,7 @@ use sqlx::sqlite::SqlitePool; use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, + login::{self, repo::Provider as _}, message::{self, repo::Provider as _}, }; @@ -33,6 +34,14 @@ impl<'a> Events<'a> { let mut tx = self.db.begin().await?; + let logins = tx.logins().replay(resume_at).await?; + let login_events = logins + .iter() + .map(login::History::events) + .kmerge_by(Sequence::merge) + .filter(Sequence::after(resume_at)) + .map(Event::from); + let channels = tx.channels().replay(resume_at).await?; let channel_events = channels .iter() @@ -49,7 +58,8 @@ impl<'a> Events<'a> { .filter(Sequence::after(resume_at)) .map(Event::from); - let replay_events = channel_events + let replay_events = login_events + .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::<Vec<_>>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index e748d66..69c7a10 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1,13 +1,14 @@ -use crate::{channel, message}; +use crate::{channel, login, message}; pub mod app; -pub mod broadcaster; +mod broadcaster; mod extract; pub mod repo; mod routes; mod sequence; pub use self::{ + broadcaster::Broadcaster, routes::router, sequence::{Instant, Sequence, Sequenced}, }; @@ -15,63 +16,37 @@ pub use self::{ pub type ResumePoint = Option<Sequence>; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Event { - #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] - pub kind: Kind, +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Event { + Login(login::Event), + Channel(channel::Event), + Message(message::Event), } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant - } -} - -impl From<channel::Event> for Event { - fn from(event: channel::Event) -> Self { - Self { - instant: event.instant, - kind: event.kind.into(), + match self { + Self::Login(event) => event.instant(), + Self::Channel(event) => event.instant(), + Self::Message(event) => event.instant(), } } } -impl From<message::Event> for Event { - fn from(event: message::Event) -> Self { - Self { - instant: event.instant(), - kind: event.kind.into(), - } +impl From<login::Event> for Event { + fn from(event: login::Event) -> Self { + Self::Login(event) } } -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Kind { - #[serde(rename = "created")] - ChannelCreated(channel::event::Created), - #[serde(rename = "message")] - MessageSent(message::event::Sent), - MessageDeleted(message::event::Deleted), - #[serde(rename = "deleted")] - ChannelDeleted(channel::event::Deleted), -} - -impl From<channel::event::Kind> for Kind { - fn from(kind: channel::event::Kind) -> Self { - match kind { - channel::event::Kind::Created(created) => Self::ChannelCreated(created), - channel::event::Kind::Deleted(deleted) => Self::ChannelDeleted(deleted), - } +impl From<channel::Event> for Event { + fn from(event: channel::Event) -> Self { + Self::Channel(event) } } -impl From<message::event::Kind> for Kind { - fn from(kind: message::event::Kind) -> Self { - match kind { - message::event::Kind::Sent(created) => Self::MessageSent(created), - message::event::Kind::Deleted(deleted) => Self::MessageDeleted(deleted), - } +impl From<message::Event> for Event { + fn from(event: message::Event) -> Self { + Self::Message(event) } } diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs index ba9953e..209a016 100644 --- a/src/event/routes/test.rs +++ b/src/event/routes/test.rs @@ -15,13 +15,13 @@ async fn includes_historical_message() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -48,7 +48,7 @@ async fn includes_live_message() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app.clone()), subscriber, None, Query::default()) @@ -57,7 +57,7 @@ async fn includes_live_message() { // Verify the semantics - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; let event = events @@ -75,7 +75,7 @@ async fn includes_multiple_channels() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -94,7 +94,7 @@ async fn includes_multiple_channels() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -122,7 +122,7 @@ async fn sequential_messages() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -132,7 +132,7 @@ async fn sequential_messages() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let routes::Events(events) = routes::events(State(app), subscriber, None, Query::default()) .await @@ -166,7 +166,7 @@ async fn resumes_from() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -177,7 +177,7 @@ async fn resumes_from() { // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { @@ -248,13 +248,13 @@ async fn serial_resume() { // Set up the environment let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::now()).await; let resume_at = { @@ -372,11 +372,11 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber = fixtures::identity::identity(&app, &subscriber_creds, &fixtures::ancient()).await; @@ -417,11 +417,11 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let sender = fixtures::login::create(&app).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; // Subscribe via the endpoint - let subscriber_creds = fixtures::login::create_with_password(&app).await; + let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; let subscriber_token = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::now()).await; let subscriber = diff --git a/src/event/sequence.rs b/src/event/sequence.rs index ceb5bcb..bf6d5b8 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -72,6 +72,10 @@ impl Sequence { pub trait Sequenced { fn instant(&self) -> Instant; + fn at(&self) -> DateTime { + self.instant().at + } + fn sequence(&self) -> Sequence { self.instant().into() } diff --git a/src/login/app.rs b/src/login/app.rs index 4f60b89..bb1419b 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,24 +1,38 @@ use sqlx::sqlite::SqlitePool; use super::{repo::Provider as _, Login, Password}; +use crate::{ + clock::DateTime, + event::{repo::Provider as _, Broadcaster, Event}, +}; pub struct Logins<'a> { db: &'a SqlitePool, + events: &'a Broadcaster, } impl<'a> Logins<'a> { - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { + Self { db, events } } - pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> { + pub async fn create( + &self, + name: &str, + password: &Password, + created_at: &DateTime, + ) -> Result<Login, CreateError> { let password_hash = password.hash()?; let mut tx = self.db.begin().await?; - let login = tx.logins().create(name, &password_hash).await?; + let created = tx.sequence().next(created_at).await?; + let login = tx.logins().create(name, &password_hash, &created).await?; tx.commit().await?; - Ok(login) + self.events + .broadcast(login.events().map(Event::from).collect::<Vec<_>>()); + + Ok(login.as_created()) } } diff --git a/src/login/event.rs b/src/login/event.rs new file mode 100644 index 0000000..b03451a --- /dev/null +++ b/src/login/event.rs @@ -0,0 +1,36 @@ +use super::snapshot::Login; +use crate::event::{Instant, Sequenced}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { + Created(Created), +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + match self { + Self::Created(created) => created.instant(), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub login: Login, +} + +impl Sequenced for Created { + fn instant(&self) -> Instant { + self.instant + } +} + +impl From<Created> for Event { + fn from(event: Created) -> Self { + Self::Created(event) + } +} diff --git a/src/login/history.rs b/src/login/history.rs new file mode 100644 index 0000000..add7d1e --- /dev/null +++ b/src/login/history.rs @@ -0,0 +1,47 @@ +use super::{ + event::{Created, Event}, + Id, Login, +}; +use crate::event::{Instant, ResumePoint, Sequence}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub login: Login, + pub created: Instant, +} + +// State interface +impl History { + pub fn id(&self) -> &Id { + &self.login.id + } + + // Snapshot of this login as it was when created. (Note to the future: it's okay + // if this returns a redacted or modified version of the login. If we implement + // renames by redacting the original name, then this should return the edited login, not the original, even if that's not how it was "as created.") + #[cfg(test)] + pub fn as_created(&self) -> Login { + self.login.clone() + } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Login> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } +} + +// Events interface +impl History { + fn created(&self) -> Event { + Created { + instant: self.created, + login: self.login.clone(), + } + .into() + } + + pub fn events(&self) -> impl Iterator<Item = Event> { + [self.created()].into_iter() + } +} diff --git a/src/login/mod.rs b/src/login/mod.rs index f272f80..98cc3d7 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,22 +1,14 @@ #[cfg(test)] pub mod app; +pub mod event; pub mod extract; +mod history; mod id; pub mod password; pub mod repo; mod routes; +mod snapshot; -pub use self::{id::Id, password::Password, routes::router}; - -// This also implements FromRequestParts (see `./extract.rs`). As a result, it -// can be used as an extractor for endpoints that want to require login, or for -// endpoints that need to behave differently depending on whether the client is -// or is not logged in. -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Login { - pub id: Id, - pub name: String, - // The omission of the hashed password is deliberate, to minimize the - // chance that it ends up tangled up in debug output or in some other chunk - // of logic elsewhere. -} +pub use self::{ + event::Event, history::History, id::Id, password::Password, routes::router, snapshot::Login, +}; diff --git a/src/login/repo.rs b/src/login/repo.rs index d1a02c4..6d6510c 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -1,6 +1,10 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::login::{password::StoredHash, Id, Login}; +use crate::{ + clock::DateTime, + event::{Instant, ResumePoint, Sequence}, + login::{password::StoredHash, History, Id, Login}, +}; pub trait Provider { fn logins(&mut self) -> Logins; @@ -19,28 +23,100 @@ impl<'c> Logins<'c> { &mut self, name: &str, password_hash: &StoredHash, - ) -> Result<Login, sqlx::Error> { + created: &Instant, + ) -> Result<History, sqlx::Error> { let id = Id::generate(); - let login = sqlx::query_as!( - Login, + let login = sqlx::query!( r#" - insert or fail - into login (id, name, password_hash) - values ($1, $2, $3) + insert + into login (id, name, password_hash, created_sequence, created_at) + values ($1, $2, $3, $4, $5) returning id as "id: Id", - name + name, + created_sequence as "created_sequence: Sequence", + created_at as "created_at: DateTime" "#, id, name, password_hash, + created.sequence, + created.at, ) + .map(|row| History { + login: Login { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) .fetch_one(&mut *self.0) .await?; Ok(login) } + + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" + select + id as "id: Id", + name, + created_sequence as "created_sequence: Sequence", + created_at as "created_at: DateTime" + from login + where coalesce(created_sequence <= $1, true) + order by created_sequence + "#, + resume_at, + ) + .map(|row| History { + login: Login { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + id as "id: Id", + name, + created_sequence as "created_sequence: Sequence", + created_at as "created_at: DateTime" + from login + where coalesce(login.created_sequence > $1, true) + "#, + resume_at, + ) + .map(|row| History { + login: Login { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } } impl<'t> From<&'t mut SqliteConnection> for Logins<'t> { diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs index 3c82738..6a3b79c 100644 --- a/src/login/routes/test/login.rs +++ b/src/login/routes/test/login.rs @@ -47,7 +47,7 @@ async fn existing_identity() { // Set up the environment let app = fixtures::scratch_app().await; - let (name, password) = fixtures::login::create_with_password(&app).await; + let (name, password) = fixtures::login::create_with_password(&app, &fixtures::now()).await; // Call the endpoint @@ -84,7 +84,7 @@ async fn authentication_failed() { // Set up the environment let app = fixtures::scratch_app().await; - let login = fixtures::login::create(&app).await; + let login = fixtures::login::create(&app, &fixtures::now()).await; // Call the endpoint @@ -109,7 +109,7 @@ async fn token_expires() { // Set up the environment let app = fixtures::scratch_app().await; - let (name, password) = fixtures::login::create_with_password(&app).await; + let (name, password) = fixtures::login::create_with_password(&app, &fixtures::now()).await; // Call the endpoint diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs index 42b2534..611829e 100644 --- a/src/login/routes/test/logout.rs +++ b/src/login/routes/test/logout.rs @@ -11,7 +11,7 @@ async fn successful() { let app = fixtures::scratch_app().await; let now = fixtures::now(); - let login = fixtures::login::create_with_password(&app).await; + let login = fixtures::login::create_with_password(&app, &fixtures::now()).await; let identity = fixtures::identity::logged_in(&app, &login, &now).await; let secret = fixtures::identity::secret(&identity); diff --git a/src/login/snapshot.rs b/src/login/snapshot.rs new file mode 100644 index 0000000..1379005 --- /dev/null +++ b/src/login/snapshot.rs @@ -0,0 +1,49 @@ +use super::{ + event::{Created, Event}, + Id, +}; + +// This also implements FromRequestParts (see `./extract.rs`). As a result, it +// can be used as an extractor for endpoints that want to require login, or for +// endpoints that need to behave differently depending on whether the client is +// or is not logged in. +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Login { + pub id: Id, + pub name: String, + // The omission of the hashed password is deliberate, to minimize the + // chance that it ends up tangled up in debug output or in some other chunk + // of logic elsewhere. +} + +impl Login { + // Two reasons for this allow: + // + // 1. This is used to collect streams using a fold, below, which requires a type consistent with the fold, and + // 2. It's also consistent with the other history state machine types. + #[allow(clippy::unnecessary_wraps)] + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event) { + (None, Event::Created(event)) => Some(event.into()), + (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Login> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Login::apply) + } +} + +impl From<&Created> for Login { + fn from(event: &Created) -> Self { + event.login.clone() + } +} + +impl From<Created> for Login { + fn from(event: Created) -> Self { + event.login + } +} diff --git a/src/message/app.rs b/src/message/app.rs index 385c92e..3385af2 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{repo::Provider as _, Broadcaster, Event, Sequence}, login::Login, }; diff --git a/src/message/event.rs b/src/message/event.rs index 66db9b0..1cd5847 100644 --- a/src/message/event.rs +++ b/src/message/event.rs @@ -1,29 +1,14 @@ use super::{snapshot::Message, Id}; -use crate::{ - channel::Channel, - event::{Instant, Sequenced}, -}; +use crate::event::{Instant, Sequenced}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Event { - #[serde(flatten)] - pub kind: Kind, -} - -impl Sequenced for Event { - fn instant(&self) -> Instant { - self.kind.instant() - } -} - -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum Kind { +#[serde(tag = "event", rename_all = "snake_case")] +pub enum Event { Sent(Sent), Deleted(Deleted), } -impl Sequenced for Kind { +impl Sequenced for Event { fn instant(&self) -> Instant { match self { Self::Sent(sent) => sent.instant(), @@ -44,7 +29,7 @@ impl Sequenced for Sent { } } -impl From<Sent> for Kind { +impl From<Sent> for Event { fn from(event: Sent) -> Self { Self::Sent(event) } @@ -54,8 +39,7 @@ impl From<Sent> for Kind { pub struct Deleted { #[serde(flatten)] pub instant: Instant, - pub channel: Channel, - pub message: Id, + pub id: Id, } impl Sequenced for Deleted { @@ -64,7 +48,7 @@ impl Sequenced for Deleted { } } -impl From<Deleted> for Kind { +impl From<Deleted> for Event { fn from(event: Deleted) -> Self { Self::Deleted(event) } diff --git a/src/message/history.rs b/src/message/history.rs index f267f4c..09e69b7 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -35,22 +35,19 @@ impl History { // Events interface impl History { fn sent(&self) -> Event { - Event { - kind: Sent { - message: self.message.clone(), - } - .into(), + Sent { + message: self.message.clone(), } + .into() } fn deleted(&self) -> Option<Event> { - self.deleted.map(|instant| Event { - kind: Deleted { + self.deleted.map(|instant| { + Deleted { instant, - channel: self.message.channel.clone(), - message: self.message.id.clone(), + id: self.message.id.clone(), } - .into(), + .into() }) } diff --git a/src/message/repo.rs b/src/message/repo.rs index 5b199a7..71c6d10 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -2,7 +2,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use super::{snapshot::Message, History, Id}; use crate::{ - channel::{self, Channel}, + channel, clock::DateTime, event::{Instant, ResumePoint, Sequence}, login::{self, Login}, @@ -38,6 +38,10 @@ impl<'c> Messages<'c> { values ($1, $2, $3, $4, $5, $6) returning id as "id: Id", + channel as "channel: channel::Id", + sender as "sender: login::Id", + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence", body "#, id, @@ -49,12 +53,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { - sent: *sent, - // Use "as created" here as we don't care about providing a perfectly up-to-date - // representation of the channel. The `name` is informational (and the ID, which is - // normative, is fixed over time). - channel: channel.as_created(), - sender: sender.clone(), + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: row.channel, + sender: row.sender, id: row.id, body: row.body, }, @@ -75,20 +79,16 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - sender.id as "sender_id: login::Id", - sender.name as "sender_name", - message.id as "id: Id", - message.body, + channel as "channel: channel::Id", + sender as "sender: login::Id", + id as "id: Id", + body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where channel.id = $1 - and coalesce(message.sent_sequence <= $2, true) - order by message.sent_sequence + where channel = $1 + and coalesce(sent_sequence <= $2, true) + order by sent_sequence "#, channel_id, resume_at, @@ -99,14 +99,43 @@ impl<'c> Messages<'c> { at: row.sent_at, sequence: row.sent_sequence, }, - channel: Channel { - id: row.channel_id, - name: row.channel_name, - }, - sender: Login { - id: row.sender_id, - name: row.sender_name, + channel: row.channel, + sender: row.sender, + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel as "channel: channel::Id", + sender as "sender: login::Id", + id as "id: Id", + body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + where coalesce(sent_sequence <= $2, true) + order by sent_sequence + "#, + resume_at, + ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, }, + channel: row.channel, + sender: row.sender, id: row.id, body: row.body, }, @@ -122,18 +151,14 @@ impl<'c> Messages<'c> { let message = sqlx::query!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - sender.id as "sender_id: login::Id", - sender.name as "sender_name", - message.id as "id: Id", - message.body, + channel as "channel: channel::Id", + sender as "sender: login::Id", + id as "id: Id", + body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id - where message.id = $1 + where id = $1 "#, message, ) @@ -143,14 +168,8 @@ impl<'c> Messages<'c> { at: row.sent_at, sequence: row.sent_sequence, }, - channel: Channel { - id: row.channel_id, - name: row.channel_name, - }, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, + channel: row.channel, + sender: row.sender, id: row.id, body: row.body, }, @@ -207,17 +226,13 @@ impl<'c> Messages<'c> { let messages = sqlx::query!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - sender.id as "sender_id: login::Id", - sender.name as "sender_name", - message.id as "id: Id", - message.body, + channel as "channel: channel::Id", + sender as "sender: login::Id", + id as "id: Id", + body, sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence" from message - join channel on message.channel = channel.id - join login as sender on message.sender = sender.id where coalesce(message.sent_sequence > $1, true) "#, resume_at, @@ -228,14 +243,8 @@ impl<'c> Messages<'c> { at: row.sent_at, sequence: row.sent_sequence, }, - channel: Channel { - id: row.channel_id, - name: row.channel_name, - }, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, + channel: row.channel, + sender: row.sender, id: row.id, body: row.body, }, diff --git a/src/message/routes.rs b/src/message/routes.rs index 29fe3d7..e21c674 100644 --- a/src/message/routes.rs +++ b/src/message/routes.rs @@ -9,7 +9,7 @@ use axum::{ use crate::{ app::App, clock::RequestedAt, - error::Internal, + error::{Internal, NotFound}, login::Login, message::{self, app::DeleteError}, }; @@ -38,7 +38,7 @@ impl IntoResponse for ErrorResponse { let Self(error) = self; match error { not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { - (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + NotFound(not_found).into_response() } other => Internal::from(other).into_response(), } diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 522c1aa..0eb37bb 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -1,57 +1,24 @@ use super::{ - event::{Event, Kind, Sent}, + event::{Event, Sent}, Id, }; -use crate::{channel::Channel, event::Instant, login::Login}; +use crate::{channel, event::Instant, login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -#[serde(into = "self::serialize::Message")] pub struct Message { - #[serde(skip)] + #[serde(flatten)] pub sent: Instant, - pub channel: Channel, - pub sender: Login, + pub channel: channel::Id, + pub sender: login::Id, pub id: Id, pub body: String, } -mod serialize { - use crate::{channel::Channel, login::Login, message::Id}; - - #[derive(serde::Serialize)] - pub struct Message { - channel: Channel, - sender: Login, - #[allow(clippy::struct_field_names)] - // Deliberately redundant with the module path; this produces a specific serialization. - message: MessageData, - } - - #[derive(serde::Serialize)] - pub struct MessageData { - id: Id, - body: String, - } - - impl From<super::Message> for Message { - fn from(message: super::Message) -> Self { - Self { - channel: message.channel, - sender: message.sender, - message: MessageData { - id: message.id, - body: message.body, - }, - } - } - } -} - impl Message { fn apply(state: Option<Self>, event: Event) -> Option<Self> { - match (state, event.kind) { - (None, Kind::Sent(event)) => Some(event.into()), - (Some(message), Kind::Deleted(event)) if message.id == event.message => None, + match (state, event) { + (None, Event::Sent(event)) => Some(event.into()), + (Some(message), Event::Deleted(event)) if message.id == event.id => None, (state, event) => panic!("invalid message event {event:#?} for state {state:#?}"), } } diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs index 09f0490..7fe2bf3 100644 --- a/src/test/fixtures/event.rs +++ b/src/test/fixtures/event.rs @@ -1,11 +1,11 @@ use crate::{ - event::{Event, Kind}, - message::Message, + event::Event, + message::{Event::Sent, Message}, }; pub fn message_sent(event: &Event, message: &Message) -> bool { matches!( - &event.kind, - Kind::MessageSent(event) if message == &event.into() + &event, + Event::Message(Sent(event)) if message == &event.into() ) } diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs index 6e62aea..84d27b0 100644 --- a/src/test/fixtures/filter.rs +++ b/src/test/fixtures/filter.rs @@ -1,11 +1,11 @@ use futures::future; -use crate::event::{Event, Kind}; +use crate::{channel::Event::Created, event::Event, message::Event::Sent}; pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> { - |event| future::ready(matches!(event.kind, Kind::MessageSent(_))) + |event| future::ready(matches!(event, Event::Message(Sent(_)))) } pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> { - |event| future::ready(matches!(event.kind, Kind::ChannelCreated(_))) + |event| future::ready(matches!(event, Event::Channel(Created(_)))) } diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs index 00c2789..e5ac716 100644 --- a/src/test/fixtures/login.rs +++ b/src/test/fixtures/login.rs @@ -3,23 +3,24 @@ use uuid::Uuid; use crate::{ app::App, + clock::RequestedAt, login::{self, Login, Password}, }; -pub async fn create_with_password(app: &App) -> (String, Password) { +pub async fn create_with_password(app: &App, created_at: &RequestedAt) -> (String, Password) { let (name, password) = propose(); app.logins() - .create(&name, &password) + .create(&name, &password, created_at) .await .expect("should always succeed if the login is actually new"); (name, password) } -pub async fn create(app: &App) -> Login { +pub async fn create(app: &App, created_at: &RequestedAt) -> Login { let (name, password) = propose(); app.logins() - .create(&name, &password) + .create(&name, &password, created_at) .await .expect("should always succeed if the login is actually new") } diff --git a/src/token/app.rs b/src/token/app.rs index 5c4fcd5..b8af637 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -7,23 +7,34 @@ use futures::{ use sqlx::sqlite::SqlitePool; use super::{ - broadcaster::Broadcaster, event, repo::auth::Provider as _, repo::Provider as _, Id, Secret, + repo::auth::Provider as _, repo::Provider as _, Broadcaster, Event as TokenEvent, Id, Secret, }; use crate::{ clock::DateTime, db::NotFound as _, + event::{self, repo::Provider as _, Event as ServiceEvent}, login::{repo::Provider as _, Login, Password}, }; pub struct Tokens<'a> { db: &'a SqlitePool, - tokens: &'a Broadcaster, + events: &'a event::Broadcaster, + token_events: &'a Broadcaster, } impl<'a> Tokens<'a> { - pub const fn new(db: &'a SqlitePool, tokens: &'a Broadcaster) -> Self { - Self { db, tokens } + pub const fn new( + db: &'a SqlitePool, + events: &'a event::Broadcaster, + token_events: &'a Broadcaster, + ) -> Self { + Self { + db, + events, + token_events, + } } + pub async fn login( &self, name: &str, @@ -32,22 +43,30 @@ impl<'a> Tokens<'a> { ) -> Result<Secret, LoginError> { let mut tx = self.db.begin().await?; - let login = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { + let (login, created) = if let Some((login, stored_hash)) = tx.auth().for_name(name).await? { if stored_hash.verify(password)? { - // Password verified; use the login. - login + // Password verified, proceed with login + (login, false) } else { // Password NOT verified. return Err(LoginError::Rejected); } } else { let password_hash = password.hash()?; - tx.logins().create(name, &password_hash).await? + let created = tx.sequence().next(login_at).await?; + let login = tx.logins().create(name, &password_hash, &created).await?; + + (login, true) }; let token = tx.tokens().issue(&login, login_at).await?; tx.commit().await?; + if created { + self.events + .broadcast(login.events().map(ServiceEvent::from).collect::<Vec<_>>()); + } + Ok(token) } @@ -76,7 +95,7 @@ impl<'a> Tokens<'a> { E: std::fmt::Debug, { // Subscribe, first. - let token_events = self.tokens.subscribe(); + let token_events = self.token_events.subscribe(); // Check that the token is valid at this point in time, second. If it is, then // any future revocations will appear in the subscription. If not, bail now. @@ -102,7 +121,9 @@ impl<'a> Tokens<'a> { // Then construct the guarded stream. First, project both streams into // `GuardedEvent`. let token_events = token_events - .filter(move |event| future::ready(event.token == token)) + .filter(move |event| { + future::ready(matches!(event, TokenEvent::Revoked(id) if id == &token)) + }) .map(|_| GuardedEvent::TokenRevoked); let events = events.map(|event| GuardedEvent::Event(event)); @@ -126,8 +147,8 @@ impl<'a> Tokens<'a> { let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; - for event in tokens.into_iter().map(event::TokenRevoked::from) { - self.tokens.broadcast(event); + for event in tokens.into_iter().map(TokenEvent::Revoked) { + self.token_events.broadcast(event); } Ok(()) @@ -138,8 +159,8 @@ impl<'a> Tokens<'a> { tx.tokens().revoke(token).await?; tx.commit().await?; - self.tokens - .broadcast(event::TokenRevoked::from(token.clone())); + self.token_events + .broadcast(TokenEvent::Revoked(token.clone())); Ok(()) } diff --git a/src/token/broadcaster.rs b/src/token/broadcaster.rs index 8e2e006..de2513a 100644 --- a/src/token/broadcaster.rs +++ b/src/token/broadcaster.rs @@ -1,4 +1,3 @@ -use super::event; use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster<event::TokenRevoked>; +pub type Broadcaster = broadcast::Broadcaster<super::Event>; diff --git a/src/token/event.rs b/src/token/event.rs index d53d436..51b74d7 100644 --- a/src/token/event.rs +++ b/src/token/event.rs @@ -1,12 +1,6 @@ use crate::token; #[derive(Clone, Debug)] -pub struct TokenRevoked { - pub token: token::Id, -} - -impl From<token::Id> for TokenRevoked { - fn from(token: token::Id) -> Self { - Self { token } - } +pub enum Event { + Revoked(token::Id), } diff --git a/src/token/mod.rs b/src/token/mod.rs index d122611..eccb3cd 100644 --- a/src/token/mod.rs +++ b/src/token/mod.rs @@ -1,9 +1,9 @@ pub mod app; -pub mod broadcaster; +mod broadcaster; mod event; pub mod extract; mod id; mod repo; mod secret; -pub use self::{id::Id, secret::Secret}; +pub use self::{broadcaster::Broadcaster, event::Event, id::Id, secret::Secret}; diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs index b299697..ddb5136 100644 --- a/src/token/repo/auth.rs +++ b/src/token/repo/auth.rs @@ -1,6 +1,10 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; -use crate::login::{self, password::StoredHash, Login}; +use crate::{ + clock::DateTime, + event::{Instant, Sequence}, + login::{self, password::StoredHash, History, Login}, +}; pub trait Provider { fn auth(&mut self) -> Auth; @@ -21,25 +25,33 @@ impl<'t> Auth<'t> { pub async fn for_name( &mut self, name: &str, - ) -> Result<Option<(Login, StoredHash)>, sqlx::Error> { + ) -> Result<Option<(History, StoredHash)>, sqlx::Error> { let found = sqlx::query!( r#" select id as "id: login::Id", name, - password_hash as "password_hash: StoredHash" + password_hash as "password_hash: StoredHash", + created_sequence as "created_sequence: Sequence", + created_at as "created_at: DateTime" from login where name = $1 "#, name, ) - .map(|rec| { + .map(|row| { ( - Login { - id: rec.id, - name: rec.name, + History { + login: Login { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, }, - rec.password_hash, + row.password_hash, ) }) .fetch_optional(&mut *self.0) diff --git a/src/token/repo/token.rs b/src/token/repo/token.rs index 5f64dac..c592dcd 100644 --- a/src/token/repo/token.rs +++ b/src/token/repo/token.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use crate::{ clock::DateTime, - login::{self, Login}, + login::{self, History, Login}, token::{Id, Secret}, }; @@ -24,11 +24,12 @@ impl<'c> Tokens<'c> { // be used to control expiry, until the token is actually used. pub async fn issue( &mut self, - login: &Login, + login: &History, issued_at: &DateTime, ) -> Result<Secret, sqlx::Error> { let id = Id::generate(); let secret = Uuid::new_v4().to_string(); + let login = login.id(); let secret = sqlx::query_scalar!( r#" @@ -39,7 +40,7 @@ impl<'c> Tokens<'c> { "#, id, secret, - login.id, + login, issued_at, ) .fetch_one(&mut *self.0) @@ -127,7 +128,7 @@ impl<'c> Tokens<'c> { select token.id as "token_id: Id", login.id as "login_id: login::Id", - name as "login_name" + login.name as "login_name" from login join token on login.id = token.login where token.secret = $1 @@ -1,38 +1,60 @@ use axum::{ extract::Path, http::{header, StatusCode}, - response::IntoResponse, + response::{IntoResponse, Response}, routing::get, Router, }; +use mime_guess::Mime; +use rust_embed::EmbeddedFile; #[derive(rust_embed::Embed)] #[folder = "hi-ui/build"] struct Assets; +pub fn router<S>() -> Router<S> +where + S: Clone + Send + Sync + 'static, +{ + Router::new() + .route("/*path", get(asset)) + .route("/", get(root)) +} + +async fn asset(Path(path): Path<String>) -> Result<Asset, NotFound<String>> { + let mime = mime_guess::from_path(&path).first_or_octet_stream(); + + Assets::get(&path) + .map(|file| Asset(mime, file)) + .ok_or(NotFound(format!("not found: {path}"))) +} + async fn root() -> impl IntoResponse { asset(Path(String::from("index.html"))).await } -async fn asset(Path(path): Path<String>) -> impl IntoResponse { - let mime = mime_guess::from_path(&path).first_or_octet_stream(); +struct Asset(Mime, EmbeddedFile); - match Assets::get(&path) { - Some(file) => ( +impl IntoResponse for Asset { + fn into_response(self) -> Response { + let Self(mime, file) = self; + ( StatusCode::OK, [(header::CONTENT_TYPE, mime.as_ref())], file.data, ) - .into_response(), - None => (StatusCode::NOT_FOUND, "").into_response(), + .into_response() } } -pub fn router<S>() -> Router<S> +struct NotFound<E>(pub E); + +impl<E> IntoResponse for NotFound<E> where - S: Clone + Send + Sync + 'static, + E: IntoResponse, { - Router::new() - .route("/*path", get(asset)) - .route("/", get(root)) + fn into_response(self) -> Response { + let Self(response) = self; + (StatusCode::NOT_FOUND, response).into_response() + } } |
