summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-05 22:42:43 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-05 22:47:12 -0400
commit6a10fcaf64938da52b326ea80013d9f30ed62a6c (patch)
tree08a3860b68391514390f42872ccc1cb4c6e6afd2
parent1fb26ad31d385ddc628e1b73d6a8764981ca6885 (diff)
Separate `/api/boot` into its own module.
-rw-r--r--src/app.rs10
-rw-r--r--src/boot/app.rs54
-rw-r--r--src/boot/mod.rs74
-rw-r--r--src/boot/routes.rs27
-rw-r--r--src/boot/routes/test.rs (renamed from src/login/routes/test/boot.rs)6
-rw-r--r--src/channel/app.rs48
-rw-r--r--src/channel/history.rs8
-rw-r--r--src/channel/repo.rs4
-rw-r--r--src/channel/routes/test/on_create.rs7
-rw-r--r--src/cli.rs3
-rw-r--r--src/event/app.rs7
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/event/routes.rs4
-rw-r--r--src/event/sequence.rs5
-rw-r--r--src/lib.rs1
-rw-r--r--src/login/app.rs13
-rw-r--r--src/login/mod.rs1
-rw-r--r--src/login/routes.rs90
-rw-r--r--src/login/routes/test/mod.rs1
-rw-r--r--src/message/app.rs27
-rw-r--r--src/message/history.rs8
-rw-r--r--src/message/repo.rs9
22 files changed, 209 insertions, 200 deletions
diff --git a/src/app.rs b/src/app.rs
index 186e5f8..6d007a9 100644
--- a/src/app.rs
+++ b/src/app.rs
@@ -1,13 +1,16 @@
use sqlx::sqlite::SqlitePool;
use crate::{
+ boot::app::Boot,
channel::app::Channels,
event::{app::Events, broadcaster::Broadcaster as EventBroadcaster},
- login::app::Logins,
message::app::Messages,
token::{app::Tokens, broadcaster::Broadcaster as TokenBroadcaster},
};
+#[cfg(test)]
+use crate::login::app::Logins;
+
#[derive(Clone)]
pub struct App {
db: SqlitePool,
@@ -24,6 +27,10 @@ impl App {
}
impl App {
+ pub const fn boot(&self) -> Boot {
+ Boot::new(&self.db)
+ }
+
pub const fn channels(&self) -> Channels {
Channels::new(&self.db, &self.events)
}
@@ -32,6 +39,7 @@ impl App {
Events::new(&self.db, &self.events)
}
+ #[cfg(test)]
pub const fn logins(&self) -> Logins {
Logins::new(&self.db)
}
diff --git a/src/boot/app.rs b/src/boot/app.rs
new file mode 100644
index 0000000..fc84b3a
--- /dev/null
+++ b/src/boot/app.rs
@@ -0,0 +1,54 @@
+use sqlx::sqlite::SqlitePool;
+
+use super::{Channel, Snapshot};
+use crate::{
+ channel::repo::Provider as _, event::repo::Provider as _, message::repo::Provider as _,
+};
+
+pub struct Boot<'a> {
+ db: &'a SqlitePool,
+}
+
+impl<'a> Boot<'a> {
+ pub const fn new(db: &'a SqlitePool) -> Self {
+ Self { db }
+ }
+
+ pub async fn snapshot(&self) -> Result<Snapshot, sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ let resume_point = tx.sequence().current().await?;
+ let channels = tx.channels().all(resume_point.into()).await?;
+
+ let channels = {
+ let mut snapshots = Vec::with_capacity(channels.len());
+
+ let channels = channels.into_iter().filter_map(|channel| {
+ channel
+ .as_of(resume_point)
+ .map(|snapshot| (channel, snapshot))
+ });
+
+ for (channel, snapshot) in channels {
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point.into())
+ .await?;
+
+ let messages = messages
+ .into_iter()
+ .filter_map(|message| message.as_of(resume_point));
+
+ snapshots.push(Channel::new(snapshot, messages));
+ }
+
+ snapshots
+ };
+
+ tx.commit().await?;
+
+ Ok(Snapshot {
+ resume_point,
+ channels,
+ })
+ }
+}
diff --git a/src/boot/mod.rs b/src/boot/mod.rs
new file mode 100644
index 0000000..bd0da0a
--- /dev/null
+++ b/src/boot/mod.rs
@@ -0,0 +1,74 @@
+pub mod app;
+mod routes;
+
+use crate::{
+ channel,
+ event::{Instant, Sequence},
+ login::Login,
+ message,
+};
+
+pub use self::routes::router;
+
+#[derive(serde::Serialize)]
+pub struct Snapshot {
+ pub resume_point: Sequence,
+ pub channels: Vec<Channel>,
+}
+
+#[derive(serde::Serialize)]
+pub struct Channel {
+ pub id: channel::Id,
+ pub name: String,
+ pub messages: Vec<Message>,
+}
+
+impl Channel {
+ fn new(
+ channel: channel::Channel,
+ messages: impl IntoIterator<Item = message::Message>,
+ ) -> Self {
+ // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types.
+ let channel::Channel { id, name } = channel;
+
+ Self {
+ id,
+ name,
+ messages: messages.into_iter().map(Message::from).collect(),
+ }
+ }
+}
+
+#[derive(serde::Serialize)]
+pub struct Message {
+ #[serde(flatten)]
+ pub sent: Instant,
+ pub sender: Login,
+ // Named this way for serialization reasons
+ #[allow(clippy::struct_field_names)]
+ pub message: Body,
+}
+
+impl From<message::Message> for Message {
+ fn from(message: message::Message) -> Self {
+ let message::Message {
+ sent,
+ channel: _,
+ sender,
+ id,
+ body,
+ } = message;
+
+ Self {
+ sent,
+ sender,
+ message: Body { id, body },
+ }
+ }
+}
+
+#[derive(serde::Serialize)]
+pub struct Body {
+ id: message::Id,
+ body: String,
+}
diff --git a/src/boot/routes.rs b/src/boot/routes.rs
new file mode 100644
index 0000000..80f70bd
--- /dev/null
+++ b/src/boot/routes.rs
@@ -0,0 +1,27 @@
+use axum::{
+ extract::{Json, State},
+ routing::get,
+ Router,
+};
+
+use super::Snapshot;
+use crate::{app::App, error::Internal, login::Login};
+
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/boot", get(boot))
+}
+
+async fn boot(State(app): State<App>, login: Login) -> Result<Json<Boot>, Internal> {
+ let snapshot = app.boot().snapshot().await?;
+ Ok(Boot { login, snapshot }.into())
+}
+
+#[derive(serde::Serialize)]
+struct Boot {
+ login: Login,
+ #[serde(flatten)]
+ snapshot: Snapshot,
+}
diff --git a/src/login/routes/test/boot.rs b/src/boot/routes/test.rs
index 9655354..5f2ba6f 100644
--- a/src/login/routes/test/boot.rs
+++ b/src/boot/routes/test.rs
@@ -1,12 +1,12 @@
-use axum::extract::State;
+use axum::extract::{Json, State};
-use crate::{login::routes, test::fixtures};
+use crate::{boot::routes, test::fixtures};
#[tokio::test]
async fn returns_identity() {
let app = fixtures::scratch_app().await;
let login = fixtures::login::fictitious();
- let response = routes::boot(State(app), login.clone())
+ let Json(response) = routes::boot(State(app), login.clone())
.await
.expect("boot always succeeds");
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 1b2cc48..a9a9e84 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -7,7 +7,7 @@ use crate::{
clock::DateTime,
db::NotFound,
event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
- message::{repo::Provider as _, Message},
+ message::repo::Provider as _,
};
pub struct Channels<'a> {
@@ -36,52 +36,6 @@ impl<'a> Channels<'a> {
Ok(channel.as_created())
}
- pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
- let mut tx = self.db.begin().await?;
- let channels = tx.channels().all(resume_point).await?;
- tx.commit().await?;
-
- let channels = channels
- .into_iter()
- .filter_map(|channel| {
- channel
- .events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- })
- .collect();
-
- Ok(channels)
- }
-
- pub async fn messages(
- &self,
- channel: &Id,
- resume_point: Option<Sequence>,
- ) -> Result<Vec<Message>, Error> {
- let mut tx = self.db.begin().await?;
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| Error::NotFound(channel.clone()))?;
-
- let messages = tx
- .messages()
- .in_channel(&channel, resume_point)
- .await?
- .into_iter()
- .filter_map(|message| {
- message
- .events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- })
- .collect();
-
- Ok(messages)
- }
-
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
diff --git a/src/channel/history.rs b/src/channel/history.rs
index bd45d8d..383fb7b 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -2,7 +2,7 @@ use super::{
event::{Created, Deleted, Event},
Channel, Id,
};
-use crate::event::Instant;
+use crate::event::{Instant, ResumePoint, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -25,6 +25,12 @@ impl History {
pub fn as_created(&self) -> Channel {
self.channel.clone()
}
+
+ pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Channel> {
+ self.events()
+ .filter(Sequence::up_to(resume_point.into()))
+ .collect()
+ }
}
// Event factories
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2b48436..2f57581 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
channel::{Channel, History, Id},
clock::DateTime,
- event::{Instant, Sequence},
+ event::{Instant, ResumePoint, Sequence},
};
pub trait Provider {
@@ -84,7 +84,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 5733c9e..ed49017 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -33,8 +33,11 @@ async fn new_channel() {
// Verify the semantics
- let channels = app.channels().all(None).await.expect("always succeeds");
- assert!(channels.contains(&response_channel));
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(snapshot
+ .channels
+ .iter()
+ .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id));
let mut events = app
.events()
diff --git a/src/cli.rs b/src/cli.rs
index 620ba9d..2552f90 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -10,7 +10,7 @@ use clap::Parser;
use sqlx::sqlite::SqlitePool;
use tokio::net;
-use crate::{app::App, channel, clock, db, event, expire, login, message, ui};
+use crate::{app::App, boot, channel, clock, db, event, expire, login, message, ui};
/// Command-line entry point for running the `hi` server.
///
@@ -110,6 +110,7 @@ impl Args {
fn routers() -> Router<App> {
[
+ boot::router(),
channel::router(),
event::router(),
login::router(),
diff --git a/src/event/app.rs b/src/event/app.rs
index d664ec7..141037d 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -6,7 +6,7 @@ use futures::{
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced};
use crate::{
channel::{self, repo::Provider as _},
message::{self, repo::Provider as _},
@@ -24,8 +24,9 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
- resume_at: Option<Sequence>,
+ resume_at: impl Into<ResumePoint>,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug, sqlx::Error> {
+ let resume_at = resume_at.into();
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -65,7 +66,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: Option<Sequence>) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 1349fe6..e748d66 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -12,6 +12,8 @@ pub use self::{
sequence::{Instant, Sequence, Sequenced},
};
+pub type ResumePoint = Option<Sequence>;
+
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Event {
#[serde(flatten)]
diff --git a/src/event/routes.rs b/src/event/routes.rs
index 5b9c7e3..de6d248 100644
--- a/src/event/routes.rs
+++ b/src/event/routes.rs
@@ -14,7 +14,7 @@ use super::{extract::LastEventId, Event};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{Sequence, Sequenced as _},
+ event::{ResumePoint, Sequence, Sequenced as _},
token::{app::ValidateError, extract::Identity},
};
@@ -27,7 +27,7 @@ pub fn router() -> Router<App> {
#[derive(Default, serde::Deserialize)]
struct EventsQuery {
- resume_point: Option<Sequence>,
+ resume_point: ResumePoint,
}
async fn events(
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index fbe3711..ceb5bcb 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -1,5 +1,6 @@
use std::fmt;
+use super::ResumePoint;
use crate::clock::DateTime;
#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)]
@@ -39,14 +40,14 @@ impl fmt::Display for Sequence {
}
impl Sequence {
- pub fn up_to<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn up_to<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point)
}
- pub fn after<E>(resume_point: Option<Self>) -> impl for<'e> Fn(&'e E) -> bool
+ pub fn after<E>(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool
where
E: Sequenced,
{
diff --git a/src/lib.rs b/src/lib.rs
index 59cf0f2..2673d2d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,6 +3,7 @@
#![warn(clippy::pedantic)]
mod app;
+mod boot;
mod broadcast;
mod channel;
pub mod cli;
diff --git a/src/login/app.rs b/src/login/app.rs
index 15adb31..4f60b89 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,8 +1,5 @@
use sqlx::sqlite::SqlitePool;
-use crate::event::{repo::Provider as _, Sequence};
-
-#[cfg(test)]
use super::{repo::Provider as _, Login, Password};
pub struct Logins<'a> {
@@ -14,15 +11,6 @@ impl<'a> Logins<'a> {
Self { db }
}
- pub async fn boot_point(&self) -> Result<Sequence, sqlx::Error> {
- let mut tx = self.db.begin().await?;
- let sequence = tx.sequence().current().await?;
- tx.commit().await?;
-
- Ok(sequence)
- }
-
- #[cfg(test)]
pub async fn create(&self, name: &str, password: &Password) -> Result<Login, CreateError> {
let password_hash = password.hash()?;
@@ -34,7 +22,6 @@ impl<'a> Logins<'a> {
}
}
-#[cfg(test)]
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum CreateError {
diff --git a/src/login/mod.rs b/src/login/mod.rs
index 65e3ada..f272f80 100644
--- a/src/login/mod.rs
+++ b/src/login/mod.rs
@@ -1,3 +1,4 @@
+#[cfg(test)]
pub mod app;
pub mod extract;
mod id;
diff --git a/src/login/routes.rs b/src/login/routes.rs
index b0e3fee..6579ae6 100644
--- a/src/login/routes.rs
+++ b/src/login/routes.rs
@@ -2,19 +2,15 @@ use axum::{
extract::{Json, State},
http::StatusCode,
response::{IntoResponse, Response},
- routing::{get, post},
+ routing::post,
Router,
};
-use futures::stream::{self, StreamExt as _, TryStreamExt as _};
use crate::{
app::App,
- channel::Channel,
clock::RequestedAt,
error::{Internal, Unauthorized},
- event::Instant,
- login::{Login, Password},
- message::{self, Message},
+ login::Password,
token::{app, extract::IdentityToken},
};
@@ -23,92 +19,10 @@ mod test;
pub fn router() -> Router<App> {
Router::new()
- .route("/api/boot", get(boot))
.route("/api/auth/login", post(on_login))
.route("/api/auth/logout", post(on_logout))
}
-async fn boot(State(app): State<App>, login: Login) -> Result<Boot, Internal> {
- let resume_point = app.logins().boot_point().await?;
- let channels = app.channels().all(resume_point.into()).await?;
- let channels = stream::iter(channels)
- .then(|channel| async {
- app.messages()
- .in_channel(&channel.id, resume_point.into())
- .await
- .map(|messages| BootChannel::new(channel, messages))
- })
- .try_collect()
- .await?;
-
- Ok(Boot {
- login,
- resume_point: resume_point.to_string(),
- channels,
- })
-}
-
-#[derive(serde::Serialize)]
-struct Boot {
- login: Login,
- resume_point: String,
- channels: Vec<BootChannel>,
-}
-
-#[derive(serde::Serialize)]
-struct BootChannel {
- #[serde(flatten)]
- channel: Channel,
- messages: Vec<BootMessage>,
-}
-
-impl BootChannel {
- fn new(channel: Channel, messages: impl IntoIterator<Item = Message>) -> Self {
- Self {
- channel,
- messages: messages.into_iter().map(BootMessage::from).collect(),
- }
- }
-}
-
-#[derive(serde::Serialize)]
-struct BootMessage {
- #[serde(flatten)]
- sent: Instant,
- sender: Login,
- message: BootMessageBody,
-}
-
-impl From<Message> for BootMessage {
- fn from(message: Message) -> Self {
- let Message {
- sent,
- channel: _,
- sender,
- id,
- body,
- } = message;
-
- Self {
- sent,
- sender,
- message: BootMessageBody { id, body },
- }
- }
-}
-
-#[derive(serde::Serialize)]
-struct BootMessageBody {
- id: message::Id,
- body: String,
-}
-
-impl IntoResponse for Boot {
- fn into_response(self) -> Response {
- Json(self).into_response()
- }
-}
-
#[derive(serde::Deserialize)]
struct LoginRequest {
name: String,
diff --git a/src/login/routes/test/mod.rs b/src/login/routes/test/mod.rs
index 7693755..90522c4 100644
--- a/src/login/routes/test/mod.rs
+++ b/src/login/routes/test/mod.rs
@@ -1,3 +1,2 @@
-mod boot;
mod login;
mod logout;
diff --git a/src/message/app.rs b/src/message/app.rs
index 1e50a65..385c92e 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -44,33 +44,6 @@ impl<'a> Messages<'a> {
Ok(message.as_sent())
}
- pub async fn in_channel(
- &self,
- channel: &channel::Id,
- resume_point: Option<Sequence>,
- ) -> Result<Vec<Message>, DeleteError> {
- let mut tx = self.db.begin().await?;
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| DeleteError::ChannelNotFound(channel.clone()))?;
- let messages = tx.messages().in_channel(&channel, resume_point).await?;
- tx.commit().await?;
-
- let messages = messages
- .into_iter()
- .filter_map(|message| {
- message
- .events()
- .filter(Sequence::up_to(resume_point))
- .collect()
- })
- .collect();
-
- Ok(messages)
- }
-
pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
let mut tx = self.db.begin().await?;
let deleted = tx.sequence().next(deleted_at).await?;
diff --git a/src/message/history.rs b/src/message/history.rs
index c44d954..f267f4c 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -2,7 +2,7 @@ use super::{
event::{Deleted, Event, Sent},
Id, Message,
};
-use crate::event::Instant;
+use crate::event::{Instant, ResumePoint, Sequence};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
@@ -24,6 +24,12 @@ impl History {
pub fn as_sent(&self) -> Message {
self.message.clone()
}
+
+ pub fn as_of(&self, resume_point: impl Into<ResumePoint>) -> Option<Message> {
+ self.events()
+ .filter(Sequence::up_to(resume_point.into()))
+ .collect()
+ }
}
// Events interface
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 2ca409d..5b199a7 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -4,7 +4,7 @@ use super::{snapshot::Message, History, Id};
use crate::{
channel::{self, Channel},
clock::DateTime,
- event::{Instant, Sequence},
+ event::{Instant, ResumePoint, Sequence},
login::{self, Login},
};
@@ -69,7 +69,7 @@ impl<'c> Messages<'c> {
pub async fn in_channel(
&mut self,
channel: &channel::History,
- resume_at: Option<Sequence>,
+ resume_at: ResumePoint,
) -> Result<Vec<History>, sqlx::Error> {
let channel_id = channel.id();
let messages = sqlx::query!(
@@ -203,10 +203,7 @@ impl<'c> Messages<'c> {
Ok(messages)
}
- pub async fn replay(
- &mut self,
- resume_at: Option<Sequence>,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
let messages = sqlx::query!(
r#"
select