diff options
| -rw-r--r-- | src/app.rs | 10 | ||||
| -rw-r--r-- | src/boot/app.rs | 54 | ||||
| -rw-r--r-- | src/boot/mod.rs | 74 | ||||
| -rw-r--r-- | src/boot/routes.rs | 27 | ||||
| -rw-r--r-- | src/boot/routes/test.rs (renamed from src/login/routes/test/boot.rs) | 6 | ||||
| -rw-r--r-- | src/channel/app.rs | 48 | ||||
| -rw-r--r-- | src/channel/history.rs | 8 | ||||
| -rw-r--r-- | src/channel/repo.rs | 4 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 7 | ||||
| -rw-r--r-- | src/cli.rs | 3 | ||||
| -rw-r--r-- | src/event/app.rs | 7 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/routes.rs | 4 | ||||
| -rw-r--r-- | src/event/sequence.rs | 5 | ||||
| -rw-r--r-- | src/lib.rs | 1 | ||||
| -rw-r--r-- | src/login/app.rs | 13 | ||||
| -rw-r--r-- | src/login/mod.rs | 1 | ||||
| -rw-r--r-- | src/login/routes.rs | 90 | ||||
| -rw-r--r-- | src/login/routes/test/mod.rs | 1 | ||||
| -rw-r--r-- | src/message/app.rs | 27 | ||||
| -rw-r--r-- | src/message/history.rs | 8 | ||||
| -rw-r--r-- | src/message/repo.rs | 9 |
22 files changed, 209 insertions, 200 deletions
@@ -1,13 +1,16 @@ use sqlx::sqlite::SqlitePool; use crate::{ + boot::app::Boot, channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, - login::app::Logins, message::app::Messages, token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; +#[cfg(test)] +use crate::login::app::Logins; + #[derive(Clone)] pub struct App { db: SqlitePool, @@ -24,6 +27,10 @@ impl App { } impl App { + pub const fn boot(&self) -> Boot { + Boot::new(&self.db) + } + pub const fn channels(&self) -> Channels { Channels::new(&self.db, &self.events) } @@ -32,6 +39,7 @@ impl App { Events::new(&self.db, &self.events) } + #[cfg(test)] pub const fn logins(&self) -> Logins { Logins::new(&self.db) } diff --git a/src/boot/app.rs b/src/boot/app.rs new file mode 100644 index 0000000..fc84b3a --- /dev/null +++ b/src/boot/app.rs @@ -0,0 +1,54 @@ +use sqlx::sqlite::SqlitePool; + +use super::{Channel, Snapshot}; +use crate::{ + channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _, +}; + +pub struct Boot<'a> { + db: &'a SqlitePool, +} + +impl<'a> Boot<'a> { + pub const fn new(db: &'a SqlitePool) -> Self { + Self { db } + } + + pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> { + let mut tx = self.db.begin().await?; + let resume_point = tx.sequence().current().await?; + let channels = tx.channels().all(resume_point.into()).await?; + + let channels = { + let mut snapshots = Vec::with_capacity(channels.len()); + + let channels = channels.into_iter().filter_map(|channel| { + channel + .as_of(resume_point) + .map(|snapshot| (channel, snapshot)) + }); + + for (channel, snapshot) in channels { + let messages = tx + .messages() + .in_channel(&channel, resume_point.into()) + .await?; + + let messages = messages + .into_iter() + .filter_map(|message| message.as_of(resume_point)); + + snapshots.push(Channel::new(snapshot, messages)); + } + + snapshots + }; + + tx.commit().await?; + + Ok(Snapshot { + resume_point, + channels, + }) + } +} diff --git a/src/boot/mod.rs b/src/boot/mod.rs new file mode 100644 index 0000000..bd0da0a --- /dev/null +++ b/src/boot/mod.rs @@ -0,0 +1,74 @@ +pub mod app; +mod routes; + +use crate::{ + channel, + event::{Instant, Sequence}, + login::Login, + message, +}; + +pub use self::routes::router; + +#[derive(serde::Serialize)] +pub struct Snapshot { + pub resume_point: Sequence, + pub channels: Vec<Channel>, +} + +#[derive(serde::Serialize)] +pub struct Channel { + pub id: channel::Id, + pub name: String, + pub messages: Vec<Message>, +} + +impl Channel { + fn new( + channel: channel::Channel, + messages: impl IntoIterator<Item = message::Message>, + ) -> Self { + // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types. + let channel::Channel { id, name } = channel; + + Self { + id, + name, + messages: messages.into_iter().map(Message::from).collect(), + } + } +} + +#[derive(serde::Serialize)] +pub struct Message { + #[serde(flatten)] + pub sent: Instant, + pub sender: Login, + // Named this way for serialization reasons + #[allow(clippy::struct_field_names)] + pub message: Body, +} + +impl From<message::Message> for Message { + fn from(message: message::Message) -> Self { + let message::Message { + sent, + channel: _, + sender, + id, + body, + } = message; + + Self { + sent, + sender, + message: Body { id, body }, + } + } +} + +#[derive(serde::Serialize)] +pub struct Body { + id: message::Id, + body: String, +} diff --git a/src/boot/routes.rs b/src/boot/routes.rs new file mode 100644 index 0000000..80f70bd --- /dev/null +++ b/src/boot/routes.rs @@ -0,0 +1,27 @@ +use axum::{ + extract::{Json, State}, + routing::get, + Router, +}; + +use super::Snapshot; +use crate::{app::App, error::Internal, login::Login}; + +#[cfg(test)] +mod test; + +pub fn router() -> Router<App> { + Router::new().route("/api/boot", get(boot)) +} + +async fn boot(State(app): State<App>, login: Login) -> Result<Json<Boot>, Internal> { + let snapshot = app.boot().snapshot().await?; + Ok(Boot { login, snapshot }.into()) +} + +#[derive(serde::Serialize)] +struct Boot { + login: Login, + #[serde(flatten)] + snapshot: Snapshot, +} diff --git a/src/login/routes/test/boot.rs b/src/boot/routes/test.rs index 9655354..5f2ba6f 100644 --- a/src/login/routes/test/boot.rs +++ b/src/boot/routes/test.rs @@ -1,12 +1,12 @@ -use axum::extract::State; +use axum::extract::{Json, State}; -use crate::{login::routes, test::fixtures}; +use crate::{boot::routes, test::fixtures}; #[tokio::test] async fn returns_identity() { let app = fixtures::scratch_app().await; let login = fixtures::login::fictitious(); - let response = routes::boot(State(app), login.clone()) + let Json(response) = routes::boot(State(app), login.clone()) .await .expect("boot always succeeds"); diff --git a/src/channel/app.rs b/src/channel/app.rs index 1b2cc48..a9a9e84 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, - message::{repo::Provider as _, Message}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -36,52 +36,6 @@ impl<'a> Channels<'a> { Ok(channel.as_created()) } - pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all(resume_point).await?; - tx.commit().await?; - - let channels = channels - .into_iter() - .filter_map(|channel| { - channel - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(channels) - } - - pub async fn messages( - &self, - channel: &Id, - resume_point: Option<Sequence>, - ) -> Result<Vec<Message>, Error> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| Error::NotFound(channel.clone()))?; - - let messages = tx - .messages() - .in_channel(&channel, resume_point) - .await? - .into_iter() - .filter_map(|message| { - message - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(messages) - } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index bd45d8d..383fb7b 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -25,6 +25,12 @@ impl History { pub fn as_created(&self) -> Channel { self.channel.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2b48436..2f57581 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, }; pub trait Provider { @@ -84,7 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5733c9e..ed49017 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -33,8 +33,11 @@ async fn new_channel() { // Verify the semantics - let channels = app.channels().all(None).await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot + .channels + .iter() + .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); let mut events = app .events() @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login, message, ui}; +use crate::{app::App, boot, channel, clock, db, event, expire, login, message, ui}; /// Command-line entry point for running the `hi` server. /// @@ -110,6 +110,7 @@ impl Args { fn routers() -> Router<App> { [ + boot::router(), channel::router(), event::router(), login::router(), diff --git a/src/event/app.rs b/src/event/app.rs index d664ec7..141037d 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, message::{self, repo::Provider as _}, @@ -24,8 +24,9 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: Option<Sequence>, + resume_at: impl Into<ResumePoint>, ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -65,7 +66,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 1349fe6..e748d66 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -12,6 +12,8 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; +pub type ResumePoint = Option<Sequence>; + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] diff --git a/src/event/routes.rs b/src/event/routes.rs index 5b9c7e3..de6d248 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Sequence, Sequenced as _}, + event::{ResumePoint, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -27,7 +27,7 @@ pub fn router() -> Router<App> { #[derive(Default, serde::Deserialize)] struct EventsQuery { - resume_point: Option<Sequence>, + resume_point: ResumePoint, } async fn events( diff --git a/src/event/sequence.rs b/src/event/sequence.rs index fbe3711..ceb5bcb 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,6 @@ use std::fmt; +use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -39,14 +40,14 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) } - pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { @@ -3,6 +3,7 @@ #![warn(clippy::pedantic)] mod app; +mod boot; mod broadcast; mod channel; pub mod cli; diff --git a/src/login/app.rs b/src/login/app.rs index 15adb31..4f60b89 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,8 +1,5 @@ use sqlx::sqlite::SqlitePool; -use crate::event::{repo::Provider as _, Sequence}; - -#[cfg(test)] use super::{repo::Provider as _, Login, Password}; pub struct Logins<'a> { @@ -14,15 +11,6 @@ impl<'a> Logins<'a> { Self { db } } - pub async fn boot_point(&self) -> Result<Sequence, sqlx::Error> { - let mut tx = self.db.begin().await?; - let sequence = tx.sequence().current().await?; - tx.commit().await?; - - Ok(sequence) - } - - #[cfg(test)] pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> { let password_hash = password.hash()?; @@ -34,7 +22,6 @@ impl<'a> Logins<'a> { } } -#[cfg(test)] #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum CreateError { diff --git a/src/login/mod.rs b/src/login/mod.rs index 65e3ada..f272f80 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,3 +1,4 @@ +#[cfg(test)] pub mod app; pub mod extract; mod id; diff --git a/src/login/routes.rs b/src/login/routes.rs index b0e3fee..6579ae6 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -2,19 +2,15 @@ use axum::{ extract::{Json, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::post, Router, }; -use futures::stream::{self, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, - channel::Channel, clock::RequestedAt, error::{Internal, Unauthorized}, - event::Instant, - login::{Login, Password}, - message::{self, Message}, + login::Password, token::{app, extract::IdentityToken}, }; @@ -23,92 +19,10 @@ mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/boot", get(boot)) .route("/api/auth/login", post(on_login)) .route("/api/auth/logout", post(on_logout)) } -async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> { - let resume_point = app.logins().boot_point().await?; - let channels = app.channels().all(resume_point.into()).await?; - let channels = stream::iter(channels) - .then(|channel| async { - app.messages() - .in_channel(&channel.id, resume_point.into()) - .await - .map(|messages| BootChannel::new(channel, messages)) - }) - .try_collect() - .await?; - - Ok(Boot { - login, - resume_point: resume_point.to_string(), - channels, - }) -} - -#[derive(serde::Serialize)] -struct Boot { - login: Login, - resume_point: String, - channels: Vec<BootChannel>, -} - -#[derive(serde::Serialize)] -struct BootChannel { - #[serde(flatten)] - channel: Channel, - messages: Vec<BootMessage>, -} - -impl BootChannel { - fn new(channel: Channel, messages: impl IntoIterator<Item = Message>) -> Self { - Self { - channel, - messages: messages.into_iter().map(BootMessage::from).collect(), - } - } -} - -#[derive(serde::Serialize)] -struct BootMessage { - #[serde(flatten)] - sent: Instant, - sender: Login, - message: BootMessageBody, -} - -impl From<Message> for BootMessage { - fn from(message: Message) -> Self { - let Message { - sent, - channel: _, - sender, - id, - body, - } = message; - - Self { - sent, - sender, - message: BootMessageBody { id, body }, - } - } -} - -#[derive(serde::Serialize)] -struct BootMessageBody { - id: message::Id, - body: String, -} - -impl IntoResponse for Boot { - fn into_response(self) -> Response { - Json(self).into_response() - } -} - #[derive(serde::Deserialize)] struct LoginRequest { name: String, diff --git a/src/login/routes/test/mod.rs b/src/login/routes/test/mod.rs index 7693755..90522c4 100644 --- a/src/login/routes/test/mod.rs +++ b/src/login/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod boot; mod login; mod logout; diff --git a/src/message/app.rs b/src/message/app.rs index 1e50a65..385c92e 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -44,33 +44,6 @@ impl<'a> Messages<'a> { Ok(message.as_sent()) } - pub async fn in_channel( - &self, - channel: &channel::Id, - resume_point: Option<Sequence>, - ) -> Result<Vec<Message>, DeleteError> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| DeleteError::ChannelNotFound(channel.clone()))?; - let messages = tx.messages().in_channel(&channel, resume_point).await?; - tx.commit().await?; - - let messages = messages - .into_iter() - .filter_map(|message| { - message - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(messages) - } - pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let deleted = tx.sequence().next(deleted_at).await?; diff --git a/src/message/history.rs b/src/message/history.rs index c44d954..f267f4c 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,6 +24,12 @@ impl History { pub fn as_sent(&self) -> Message { self.message.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Events interface diff --git a/src/message/repo.rs b/src/message/repo.rs index 2ca409d..5b199a7 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, History, Id}; use crate::{ channel::{self, Channel}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, login::{self, Login}, }; @@ -69,7 +69,7 @@ impl<'c> Messages<'c> { pub async fn in_channel( &mut self, channel: &channel::History, - resume_at: Option<Sequence>, + resume_at: ResumePoint, ) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( @@ -203,10 +203,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let messages = sqlx::query!( r#" select |
