diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/app.rs | 10 | ||||
| -rw-r--r-- | src/boot/app.rs | 54 | ||||
| -rw-r--r-- | src/boot/mod.rs | 74 | ||||
| -rw-r--r-- | src/boot/routes.rs | 27 | ||||
| -rw-r--r-- | src/boot/routes/test.rs (renamed from src/login/routes/test/boot.rs) | 6 | ||||
| -rw-r--r-- | src/channel/app.rs | 48 | ||||
| -rw-r--r-- | src/channel/history.rs | 8 | ||||
| -rw-r--r-- | src/channel/repo.rs | 4 | ||||
| -rw-r--r-- | src/channel/routes.rs | 89 | ||||
| -rw-r--r-- | src/channel/routes/test/list.rs | 65 | ||||
| -rw-r--r-- | src/channel/routes/test/mod.rs | 1 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 7 | ||||
| -rw-r--r-- | src/cli.rs | 3 | ||||
| -rw-r--r-- | src/event/app.rs | 7 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/routes.rs | 4 | ||||
| -rw-r--r-- | src/event/sequence.rs | 5 | ||||
| -rw-r--r-- | src/lib.rs | 1 | ||||
| -rw-r--r-- | src/login/app.rs | 13 | ||||
| -rw-r--r-- | src/login/mod.rs | 1 | ||||
| -rw-r--r-- | src/login/routes.rs | 25 | ||||
| -rw-r--r-- | src/login/routes/test/mod.rs | 1 | ||||
| -rw-r--r-- | src/message/history.rs | 8 | ||||
| -rw-r--r-- | src/message/repo.rs | 9 |
24 files changed, 211 insertions, 261 deletions
@@ -1,13 +1,16 @@ use sqlx::sqlite::SqlitePool; use crate::{ + boot::app::Boot, channel::app::Channels, event::{app::Events, broadcaster::Broadcaster as EventBroadcaster}, - login::app::Logins, message::app::Messages, token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster}, }; +#[cfg(test)] +use crate::login::app::Logins; + #[derive(Clone)] pub struct App { db: SqlitePool, @@ -24,6 +27,10 @@ impl App { } impl App { + pub const fn boot(&self) -> Boot { + Boot::new(&self.db) + } + pub const fn channels(&self) -> Channels { Channels::new(&self.db, &self.events) } @@ -32,6 +39,7 @@ impl App { Events::new(&self.db, &self.events) } + #[cfg(test)] pub const fn logins(&self) -> Logins { Logins::new(&self.db) } diff --git a/src/boot/app.rs b/src/boot/app.rs new file mode 100644 index 0000000..fc84b3a --- /dev/null +++ b/src/boot/app.rs @@ -0,0 +1,54 @@ +use sqlx::sqlite::SqlitePool; + +use super::{Channel, Snapshot}; +use crate::{ + channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _, +}; + +pub struct Boot<'a> { + db: &'a SqlitePool, +} + +impl<'a> Boot<'a> { + pub const fn new(db: &'a SqlitePool) -> Self { + Self { db } + } + + pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> { + let mut tx = self.db.begin().await?; + let resume_point = tx.sequence().current().await?; + let channels = tx.channels().all(resume_point.into()).await?; + + let channels = { + let mut snapshots = Vec::with_capacity(channels.len()); + + let channels = channels.into_iter().filter_map(|channel| { + channel + .as_of(resume_point) + .map(|snapshot| (channel, snapshot)) + }); + + for (channel, snapshot) in channels { + let messages = tx + .messages() + .in_channel(&channel, resume_point.into()) + .await?; + + let messages = messages + .into_iter() + .filter_map(|message| message.as_of(resume_point)); + + snapshots.push(Channel::new(snapshot, messages)); + } + + snapshots + }; + + tx.commit().await?; + + Ok(Snapshot { + resume_point, + channels, + }) + } +} diff --git a/src/boot/mod.rs b/src/boot/mod.rs new file mode 100644 index 0000000..bd0da0a --- /dev/null +++ b/src/boot/mod.rs @@ -0,0 +1,74 @@ +pub mod app; +mod routes; + +use crate::{ + channel, + event::{Instant, Sequence}, + login::Login, + message, +}; + +pub use self::routes::router; + +#[derive(serde::Serialize)] +pub struct Snapshot { + pub resume_point: Sequence, + pub channels: Vec<Channel>, +} + +#[derive(serde::Serialize)] +pub struct Channel { + pub id: channel::Id, + pub name: String, + pub messages: Vec<Message>, +} + +impl Channel { + fn new( + channel: channel::Channel, + messages: impl IntoIterator<Item = message::Message>, + ) -> Self { + // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types. + let channel::Channel { id, name } = channel; + + Self { + id, + name, + messages: messages.into_iter().map(Message::from).collect(), + } + } +} + +#[derive(serde::Serialize)] +pub struct Message { + #[serde(flatten)] + pub sent: Instant, + pub sender: Login, + // Named this way for serialization reasons + #[allow(clippy::struct_field_names)] + pub message: Body, +} + +impl From<message::Message> for Message { + fn from(message: message::Message) -> Self { + let message::Message { + sent, + channel: _, + sender, + id, + body, + } = message; + + Self { + sent, + sender, + message: Body { id, body }, + } + } +} + +#[derive(serde::Serialize)] +pub struct Body { + id: message::Id, + body: String, +} diff --git a/src/boot/routes.rs b/src/boot/routes.rs new file mode 100644 index 0000000..80f70bd --- /dev/null +++ b/src/boot/routes.rs @@ -0,0 +1,27 @@ +use axum::{ + extract::{Json, State}, + routing::get, + Router, +}; + +use super::Snapshot; +use crate::{app::App, error::Internal, login::Login}; + +#[cfg(test)] +mod test; + +pub fn router() -> Router<App> { + Router::new().route("/api/boot", get(boot)) +} + +async fn boot(State(app): State<App>, login: Login) -> Result<Json<Boot>, Internal> { + let snapshot = app.boot().snapshot().await?; + Ok(Boot { login, snapshot }.into()) +} + +#[derive(serde::Serialize)] +struct Boot { + login: Login, + #[serde(flatten)] + snapshot: Snapshot, +} diff --git a/src/login/routes/test/boot.rs b/src/boot/routes/test.rs index 9655354..5f2ba6f 100644 --- a/src/login/routes/test/boot.rs +++ b/src/boot/routes/test.rs @@ -1,12 +1,12 @@ -use axum::extract::State; +use axum::extract::{Json, State}; -use crate::{login::routes, test::fixtures}; +use crate::{boot::routes, test::fixtures}; #[tokio::test] async fn returns_identity() { let app = fixtures::scratch_app().await; let login = fixtures::login::fictitious(); - let response = routes::boot(State(app), login.clone()) + let Json(response) = routes::boot(State(app), login.clone()) .await .expect("boot always succeeds"); diff --git a/src/channel/app.rs b/src/channel/app.rs index 1b2cc48..a9a9e84 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, - message::{repo::Provider as _, Message}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -36,52 +36,6 @@ impl<'a> Channels<'a> { Ok(channel.as_created()) } - pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { - let mut tx = self.db.begin().await?; - let channels = tx.channels().all(resume_point).await?; - tx.commit().await?; - - let channels = channels - .into_iter() - .filter_map(|channel| { - channel - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(channels) - } - - pub async fn messages( - &self, - channel: &Id, - resume_point: Option<Sequence>, - ) -> Result<Vec<Message>, Error> { - let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) - .await - .not_found(|| Error::NotFound(channel.clone()))?; - - let messages = tx - .messages() - .in_channel(&channel, resume_point) - .await? - .into_iter() - .filter_map(|message| { - message - .events() - .filter(Sequence::up_to(resume_point)) - .collect() - }) - .collect(); - - Ok(messages) - } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index bd45d8d..383fb7b 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -25,6 +25,12 @@ impl History { pub fn as_created(&self) -> Channel { self.channel.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2b48436..2f57581 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ channel::{Channel, History, Id}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, }; pub trait Provider { @@ -84,7 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 23c0602..5d67af8 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,56 +2,21 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{delete, get, post}, + routing::{delete, post}, Router, }; -use axum_extra::extract::Query; use super::{app, Channel, Id}; -use crate::{ - app::App, - clock::RequestedAt, - error::Internal, - event::{Instant, Sequence}, - login::Login, - message::{self, app::SendError}, -}; +use crate::{app::App, clock::RequestedAt, error::Internal, login::Login, message::app::SendError}; #[cfg(test)] mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) - .route("/api/channels/:channel/messages", get(messages)) -} - -#[derive(Default, serde::Deserialize)] -struct ResumeQuery { - resume_point: Option<Sequence>, -} - -async fn list( - State(app): State<App>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Channels, Internal> { - let channels = app.channels().all(query.resume_point).await?; - let response = Channels(channels); - - Ok(response) -} - -struct Channels(Vec<Channel>); - -impl IntoResponse for Channels { - fn into_response(self) -> Response { - let Self(channels) = self; - Json(channels).into_response() - } } #[derive(Clone, serde::Deserialize)] @@ -150,53 +115,3 @@ impl IntoResponse for ErrorResponse { } } } - -async fn messages( - State(app): State<App>, - Path(channel): Path<Id>, - _: Login, - Query(query): Query<ResumeQuery>, -) -> Result<Messages, ErrorResponse> { - let messages = app - .channels() - .messages(&channel, query.resume_point) - .await?; - let response = Messages( - messages - .into_iter() - .map(|message| MessageView { - sent: message.sent, - sender: message.sender, - message: MessageInner { - id: message.id, - body: message.body, - }, - }) - .collect(), - ); - - Ok(response) -} - -struct Messages(Vec<MessageView>); - -#[derive(serde::Serialize)] -struct MessageView { - #[serde(flatten)] - sent: Instant, - sender: Login, - message: MessageInner, -} - -#[derive(serde::Serialize)] -struct MessageInner { - id: message::Id, - body: String, -} - -impl IntoResponse for Messages { - fn into_response(self) -> Response { - let Self(messages) = self; - Json(messages).into_response() - } -} diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs deleted file mode 100644 index f15a53c..0000000 --- a/src/channel/routes/test/list.rs +++ /dev/null @@ -1,65 +0,0 @@ -use axum::extract::State; -use axum_extra::extract::Query; - -use crate::{channel::routes, test::fixtures}; - -#[tokio::test] -async fn empty_list() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.is_empty()); -} - -#[tokio::test] -async fn one_channel() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - - // Call the endpoint - - let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels.contains(&channel)); -} - -#[tokio::test] -async fn multiple_channels() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let viewer = fixtures::login::create(&app).await; - let channels = vec![ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, - ]; - - // Call the endpoint - - let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default()) - .await - .expect("always succeeds"); - - // Verify the semantics - - assert!(channels - .into_iter() - .all(|channel| response_channels.contains(&channel))); -} diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs index ab663eb..3e5aa17 100644 --- a/src/channel/routes/test/mod.rs +++ b/src/channel/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod list; mod on_create; mod on_send; diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 5733c9e..ed49017 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -33,8 +33,11 @@ async fn new_channel() { // Verify the semantics - let channels = app.channels().all(None).await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot + .channels + .iter() + .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); let mut events = app .events() @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login, message, ui}; +use crate::{app::App, boot, channel, clock, db, event, expire, login, message, ui}; /// Command-line entry point for running the `hi` server. /// @@ -110,6 +110,7 @@ impl Args { fn routers() -> Router<App> { [ + boot::router(), channel::router(), event::router(), login::router(), diff --git a/src/event/app.rs b/src/event/app.rs index d664ec7..141037d 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, message::{self, repo::Provider as _}, @@ -24,8 +24,9 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: Option<Sequence>, + resume_at: impl Into<ResumePoint>, ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> { + let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -65,7 +66,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { + fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 1349fe6..e748d66 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -12,6 +12,8 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; +pub type ResumePoint = Option<Sequence>; + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] diff --git a/src/event/routes.rs b/src/event/routes.rs index 5b9c7e3..de6d248 100644 --- a/src/event/routes.rs +++ b/src/event/routes.rs @@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{Sequence, Sequenced as _}, + event::{ResumePoint, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -27,7 +27,7 @@ pub fn router() -> Router<App> { #[derive(Default, serde::Deserialize)] struct EventsQuery { - resume_point: Option<Sequence>, + resume_point: ResumePoint, } async fn events( diff --git a/src/event/sequence.rs b/src/event/sequence.rs index fbe3711..ceb5bcb 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,5 +1,6 @@ use std::fmt; +use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -39,14 +40,14 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) } - pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool + pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { @@ -3,6 +3,7 @@ #![warn(clippy::pedantic)] mod app; +mod boot; mod broadcast; mod channel; pub mod cli; diff --git a/src/login/app.rs b/src/login/app.rs index 15adb31..4f60b89 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,8 +1,5 @@ use sqlx::sqlite::SqlitePool; -use crate::event::{repo::Provider as _, Sequence}; - -#[cfg(test)] use super::{repo::Provider as _, Login, Password}; pub struct Logins<'a> { @@ -14,15 +11,6 @@ impl<'a> Logins<'a> { Self { db } } - pub async fn boot_point(&self) -> Result<Sequence, sqlx::Error> { - let mut tx = self.db.begin().await?; - let sequence = tx.sequence().current().await?; - tx.commit().await?; - - Ok(sequence) - } - - #[cfg(test)] pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> { let password_hash = password.hash()?; @@ -34,7 +22,6 @@ impl<'a> Logins<'a> { } } -#[cfg(test)] #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum CreateError { diff --git a/src/login/mod.rs b/src/login/mod.rs index 65e3ada..f272f80 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,3 +1,4 @@ +#[cfg(test)] pub mod app; pub mod extract; mod id; diff --git a/src/login/routes.rs b/src/login/routes.rs index 0874cc3..6579ae6 100644 --- a/src/login/routes.rs +++ b/src/login/routes.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Json, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::post, Router, }; @@ -10,7 +10,7 @@ use crate::{ app::App, clock::RequestedAt, error::{Internal, Unauthorized}, - login::{Login, Password}, + login::Password, token::{app, extract::IdentityToken}, }; @@ -19,31 +19,10 @@ mod test; pub fn router() -> Router<App> { Router::new() - .route("/api/boot", get(boot)) .route("/api/auth/login", post(on_login)) .route("/api/auth/logout", post(on_logout)) } -async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> { - let resume_point = app.logins().boot_point().await?; - Ok(Boot { - login, - resume_point: resume_point.to_string(), - }) -} - -#[derive(serde::Serialize)] -struct Boot { - login: Login, - resume_point: String, -} - -impl IntoResponse for Boot { - fn into_response(self) -> Response { - Json(self).into_response() - } -} - #[derive(serde::Deserialize)] struct LoginRequest { name: String, diff --git a/src/login/routes/test/mod.rs b/src/login/routes/test/mod.rs index 7693755..90522c4 100644 --- a/src/login/routes/test/mod.rs +++ b/src/login/routes/test/mod.rs @@ -1,3 +1,2 @@ -mod boot; mod login; mod logout; diff --git a/src/message/history.rs b/src/message/history.rs index c44d954..f267f4c 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::Instant; +use crate::event::{Instant, ResumePoint, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,6 +24,12 @@ impl History { pub fn as_sent(&self) -> Message { self.message.clone() } + + pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> { + self.events() + .filter(Sequence::up_to(resume_point.into())) + .collect() + } } // Events interface diff --git a/src/message/repo.rs b/src/message/repo.rs index 2ca409d..5b199a7 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, History, Id}; use crate::{ channel::{self, Channel}, clock::DateTime, - event::{Instant, Sequence}, + event::{Instant, ResumePoint, Sequence}, login::{self, Login}, }; @@ -69,7 +69,7 @@ impl<'c> Messages<'c> { pub async fn in_channel( &mut self, channel: &channel::History, - resume_at: Option<Sequence>, + resume_at: ResumePoint, ) -> Result<Vec<History>, sqlx::Error> { let channel_id = channel.id(); let messages = sqlx::query!( @@ -203,10 +203,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay( - &mut self, - resume_at: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { let messages = sqlx::query!( r#" select |
