From 66d3fcf2e22f057bacce8d97d43a13c1c5a9ad09 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 29 Oct 2024 23:29:22 -0400 Subject: Add `change password` UI + API. The protocol here re-checks the caller's password, as a "I left myself logged in" anti-pranking check. --- src/login/repo.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'src/login/repo.rs') diff --git a/src/login/repo.rs b/src/login/repo.rs index 611edd6..a972304 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -58,6 +58,29 @@ impl<'c> Logins<'c> { Ok(login) } + pub async fn set_password( + &mut self, + login: &History, + to: &StoredHash, + ) -> Result<(), sqlx::Error> { + let login = login.id(); + + sqlx::query_scalar!( + r#" + update login + set password_hash = $1 + where id = $2 + returning id as "id: Id" + "#, + to, + login, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { let logins = sqlx::query!( r#" -- cgit v1.2.3 From 70591c5ac10069a4ae649bd6f79d769da9e32a98 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 01:25:04 -0400 Subject: Remove `hi-recanonicalize`. This utility was needed to support a database migration with existing data. I have it on good authority that no further databases exist that are in the state that made this tool necessary. --- ...29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json | 12 -- ...16293ea1bcc4913987ee751951e9d2f31bf495f305.json | 26 ---- ...1241dc35263ccfee9f3424111e7fa6014071f98a1e.json | 26 ---- ...42bfc6c82b1464afa98845e537e850d05deb328f06.json | 12 -- Cargo.toml | 1 - docs/internal-server-errors.md | 19 --- src/app.rs | 9 +- src/bin/hi-recanonicalize.rs | 9 -- src/bin/hi.rs | 9 -- src/channel/app.rs | 8 - src/channel/repo.rs | 32 ---- src/cli.rs | 170 ++++++++++++++++++++ src/cli/mod.rs | 172 --------------------- src/cli/recanonicalize.rs | 86 ----------- src/login/app.rs | 21 --- src/login/mod.rs | 1 + src/login/repo.rs | 32 ---- src/main.rs | 9 ++ 18 files changed, 183 insertions(+), 471 deletions(-) delete mode 100644 .sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json delete mode 100644 .sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json delete mode 100644 .sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json delete mode 100644 .sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json delete mode 100644 src/bin/hi-recanonicalize.rs delete mode 100644 src/bin/hi.rs create mode 100644 src/cli.rs delete mode 100644 src/cli/mod.rs delete mode 100644 src/cli/recanonicalize.rs create mode 100644 src/main.rs (limited to 'src/login/repo.rs') diff --git a/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json b/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json deleted file mode 100644 index 1105391..0000000 --- a/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update channel_name\n set canonical_name = $1\n where id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8" -} diff --git a/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json b/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json deleted file mode 100644 index be5b784..0000000 --- a/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\"\n from channel_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305" -} diff --git a/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json b/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json deleted file mode 100644 index fd601e9..0000000 --- a/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\"\n from login\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e" -} diff --git a/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json b/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json deleted file mode 100644 index 677495b..0000000 --- a/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update login\n set canonical_name = $1\n where id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06" -} diff --git a/Cargo.toml b/Cargo.toml index c8b37e1..83c3aa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ maintainer-scripts = "debian" assets = [ # Binaries ["target/release/hi", "/usr/bin/hi", "755"], - ["target/release/hi-recanonicalize", "/usr/bin/hi-recanonicalize", "755"], # Configuration ["debian/default", "/etc/default/hi", "644"], diff --git a/docs/internal-server-errors.md b/docs/internal-server-errors.md index 4f679b7..16d61a2 100644 --- a/docs/internal-server-errors.md +++ b/docs/internal-server-errors.md @@ -9,22 +9,3 @@ The server attempted two write transactions at the same time, and encountered [s This error will almost always resolve itself if clients re-try their requests; no further action is needed. This is a known issue. If you are encountering this consistently (or if you can trigger it on demand), let us know. We are aware of sqlite's features for mitigating this issue but have been unsuccessful in applying them; we're working on it, but patches _are_ welcome, if you have the opportunity. - -## stored canonical form […] does not match computed canonical form […] for name […] - -When `hi` applies the `migrations/20241019191531_canonical_names.sql` migration (from commit `3f9648eed48cd8b6cd35d0ae2ee5bbe25fa735ac`), this can leave existing names in a state where the stored canonical form is not the correct canonicalization of the stored display names of channels and logins. `hi` will abort requests when it encounters this situation, to avoid incorrect behaviours such as duplicate channels or duplicate logins. - -As channel and login names may be presented during client startup, this can render the service unusable until repaired. Treat this as an immediate outage if you see it. - -You can verify that login names are unique by running the following commands as the user the `hi` server runs as: - -* `sqlite3 .hi 'select display_name from login'` -* `sqlite3 .hi 'select display_name from channel_name'` - -Substitute `.hi` with the path to your `hi` database if it differs from the default. - -If the names are unique, you can repair the database: - -* Stop the `hi` server. -* Run `hi-recanonicalize`, as the same user the `hi` server runs as, with the same database options. -* Start the `hi` server. diff --git a/src/app.rs b/src/app.rs index bc1daa5..0dbf017 100644 --- a/src/app.rs +++ b/src/app.rs @@ -5,12 +5,14 @@ use crate::{ channel::app::Channels, event::{self, app::Events}, invite::app::Invites, - login::app::Logins, message::app::Messages, setup::app::Setup, token::{self, app::Tokens}, }; +#[cfg(test)] +use crate::login::app::Logins; + #[derive(Clone)] pub struct App { db: SqlitePool, @@ -47,11 +49,6 @@ impl App { Invites::new(&self.db, &self.events) } - #[cfg(not(test))] - pub const fn logins(&self) -> Logins { - Logins::new(&self.db) - } - #[cfg(test)] pub const fn logins(&self) -> Logins { Logins::new(&self.db, &self.events) diff --git a/src/bin/hi-recanonicalize.rs b/src/bin/hi-recanonicalize.rs deleted file mode 100644 index 4081276..0000000 --- a/src/bin/hi-recanonicalize.rs +++ /dev/null @@ -1,9 +0,0 @@ -use clap::Parser; - -use hi::cli; - -#[tokio::main] -async fn main() -> Result<(), cli::recanonicalize::Error> { - let args = cli::recanonicalize::Args::parse(); - args.run().await -} diff --git a/src/bin/hi.rs b/src/bin/hi.rs deleted file mode 100644 index d0830ff..0000000 --- a/src/bin/hi.rs +++ /dev/null @@ -1,9 +0,0 @@ -use clap::Parser; - -use hi::cli; - -#[tokio::main] -async fn main() -> Result<(), cli::Error> { - let args = cli::Args::parse(); - args.run().await -} diff --git a/src/channel/app.rs b/src/channel/app.rs index e32eb6c..21784e9 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -137,14 +137,6 @@ impl<'a> Channels<'a> { Ok(()) } - - pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { - let mut tx = self.db.begin().await?; - tx.channels().recanonicalize().await?; - tx.commit().await?; - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index a49db52..f47e564 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -300,38 +300,6 @@ impl<'c> Channels<'c> { Ok(channels) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from channel_name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - let name = Name::from(channel.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update channel_name - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - channel.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..0659851 --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,170 @@ +//! The `hi` command-line interface. +//! +//! This module supports running `hi` as a freestanding program, via the +//! [`Args`] struct. + +use std::{future, io}; + +use axum::{ + http::header, + middleware, + response::{IntoResponse, Response}, + Router, +}; +use clap::{CommandFactory, Parser}; +use sqlx::sqlite::SqlitePool; +use tokio::net; + +use crate::{ + app::App, + boot, channel, clock, db, event, expire, invite, login, message, + setup::{self, middleware::setup_required}, + ui, +}; + +/// Command-line entry point for running the `hi` server. +/// +/// This is intended to be used as a Clap [Parser], to capture command-line +/// arguments for the `hi` server: +/// +/// ```no_run +/// # use hi::cli::Error; +/// # +/// # #[tokio::main] +/// # async fn main() -> Result<(), Error> { +/// use clap::Parser; +/// use hi::cli::Args; +/// +/// let args = Args::parse(); +/// args.run().await?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Parser)] +#[command( + version, + about = "Run the `hi` server.", + long_about = r#"Run the `hi` server. + +The database at `--database-url` will be created, or upgraded, automatically."# +)] +pub struct Args { + /// The network address `hi` should listen on + #[arg(short, long, env, default_value = "localhost")] + address: String, + + /// The network port `hi` should listen on + #[arg(short, long, env, default_value_t = 64209)] + port: u16, + + /// Sqlite URL or path for the `hi` database + #[arg(short, long, env, default_value = "sqlite://.hi")] + database_url: String, + + /// Sqlite URL or path for a backup of the `hi` database during upgrades + #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] + backup_database_url: String, +} + +impl Args { + /// Runs the `hi` server, using the parsed configuation in `self`. + /// + /// This will perform the following tasks: + /// + /// * Migrate the `hi` database (at `--database-url`). + /// * Start an HTTP server (on the interface and port controlled by + /// `--address` and `--port`). + /// * Print a status message. + /// * Wait for that server to shut down. + /// + /// # Errors + /// + /// Will return `Err` if the server is unable to start, or terminates + /// prematurely. The specific [`Error`] variant will expose the cause + /// of the failure. + pub async fn run(self) -> Result<(), Error> { + let pool = self.pool().await?; + + let app = App::from(pool); + let app = routers(&app) + .route_layer(middleware::from_fn_with_state( + app.clone(), + expire::middleware, + )) + .route_layer(middleware::from_fn(clock::middleware)) + .route_layer(middleware::map_response(Self::server_info())) + .with_state(app); + + let listener = self.listener().await?; + let started_msg = started_msg(&listener)?; + + let serve = axum::serve(listener, app); + println!("{started_msg}"); + serve.await?; + + Ok(()) + } + + async fn listener(&self) -> io::Result { + let listen_addr = self.listen_addr(); + let listener = tokio::net::TcpListener::bind(listen_addr).await?; + Ok(listener) + } + + fn listen_addr(&self) -> impl net::ToSocketAddrs + '_ { + (self.address.as_str(), self.port) + } + + async fn pool(&self) -> Result { + db::prepare(&self.database_url, &self.backup_database_url).await + } + + fn server_info() -> impl Clone + Fn(Response) -> future::Ready { + let command = Self::command(); + let name = command.get_name(); + let version = command.get_version().unwrap_or("unknown version"); + let version = format!("{name}/{version}"); + move |resp| { + let response = ([(header::SERVER, &version)], resp).into_response(); + future::ready(response) + } + } +} + +fn routers(app: &App) -> Router { + [ + [ + // API endpoints that require setup to function + boot::router(), + channel::router(), + event::router(), + invite::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) + .route_layer(middleware::from_fn_with_state(app.clone(), setup_required)), + // API endpoints that handle setup + setup::router(), + // The UI (handles setup state itself) + ui::router(app), + ] + .into_iter() + .fold(Router::default(), Router::merge) +} + +fn started_msg(listener: &net::TcpListener) -> io::Result { + let local_addr = listener.local_addr()?; + Ok(format!("listening on http://{local_addr}/")) +} + +/// Errors that can be raised by [`Args::run`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum Error { + /// Failure due to `io::Error`. See [`io::Error`]. + Io(#[from] io::Error), + /// Failure due to a database initialization error. See [`db::Error`]. + Database(#[from] db::Error), +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs deleted file mode 100644 index c75ce2b..0000000 --- a/src/cli/mod.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! The `hi` command-line interface. -//! -//! This module supports running `hi` as a freestanding program, via the -//! [`Args`] struct. - -use std::{future, io}; - -use axum::{ - http::header, - middleware, - response::{IntoResponse, Response}, - Router, -}; -use clap::{CommandFactory, Parser}; -use sqlx::sqlite::SqlitePool; -use tokio::net; - -use crate::{ - app::App, - boot, channel, clock, db, event, expire, invite, login, message, - setup::{self, middleware::setup_required}, - ui, -}; - -pub mod recanonicalize; - -/// Command-line entry point for running the `hi` server. -/// -/// This is intended to be used as a Clap [Parser], to capture command-line -/// arguments for the `hi` server: -/// -/// ```no_run -/// # use hi::cli::Error; -/// # -/// # #[tokio::main] -/// # async fn main() -> Result<(), Error> { -/// use clap::Parser; -/// use hi::cli::Args; -/// -/// let args = Args::parse(); -/// args.run().await?; -/// # Ok(()) -/// # } -/// ``` -#[derive(Parser)] -#[command( - version, - about = "Run the `hi` server.", - long_about = r#"Run the `hi` server. - -The database at `--database-url` will be created, or upgraded, automatically."# -)] -pub struct Args { - /// The network address `hi` should listen on - #[arg(short, long, env, default_value = "localhost")] - address: String, - - /// The network port `hi` should listen on - #[arg(short, long, env, default_value_t = 64209)] - port: u16, - - /// Sqlite URL or path for the `hi` database - #[arg(short, long, env, default_value = "sqlite://.hi")] - database_url: String, - - /// Sqlite URL or path for a backup of the `hi` database during upgrades - #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] - backup_database_url: String, -} - -impl Args { - /// Runs the `hi` server, using the parsed configuation in `self`. - /// - /// This will perform the following tasks: - /// - /// * Migrate the `hi` database (at `--database-url`). - /// * Start an HTTP server (on the interface and port controlled by - /// `--address` and `--port`). - /// * Print a status message. - /// * Wait for that server to shut down. - /// - /// # Errors - /// - /// Will return `Err` if the server is unable to start, or terminates - /// prematurely. The specific [`Error`] variant will expose the cause - /// of the failure. - pub async fn run(self) -> Result<(), Error> { - let pool = self.pool().await?; - - let app = App::from(pool); - let app = routers(&app) - .route_layer(middleware::from_fn_with_state( - app.clone(), - expire::middleware, - )) - .route_layer(middleware::from_fn(clock::middleware)) - .route_layer(middleware::map_response(Self::server_info())) - .with_state(app); - - let listener = self.listener().await?; - let started_msg = started_msg(&listener)?; - - let serve = axum::serve(listener, app); - println!("{started_msg}"); - serve.await?; - - Ok(()) - } - - async fn listener(&self) -> io::Result { - let listen_addr = self.listen_addr(); - let listener = tokio::net::TcpListener::bind(listen_addr).await?; - Ok(listener) - } - - fn listen_addr(&self) -> impl net::ToSocketAddrs + '_ { - (self.address.as_str(), self.port) - } - - async fn pool(&self) -> Result { - db::prepare(&self.database_url, &self.backup_database_url).await - } - - fn server_info() -> impl Clone + Fn(Response) -> future::Ready { - let command = Self::command(); - let name = command.get_name(); - let version = command.get_version().unwrap_or("unknown version"); - let version = format!("{name}/{version}"); - move |resp| { - let response = ([(header::SERVER, &version)], resp).into_response(); - future::ready(response) - } - } -} - -fn routers(app: &App) -> Router { - [ - [ - // API endpoints that require setup to function - boot::router(), - channel::router(), - event::router(), - invite::router(), - login::router(), - message::router(), - ] - .into_iter() - .fold(Router::default(), Router::merge) - .route_layer(middleware::from_fn_with_state(app.clone(), setup_required)), - // API endpoints that handle setup - setup::router(), - // The UI (handles setup state itself) - ui::router(app), - ] - .into_iter() - .fold(Router::default(), Router::merge) -} - -fn started_msg(listener: &net::TcpListener) -> io::Result { - let local_addr = listener.local_addr()?; - Ok(format!("listening on http://{local_addr}/")) -} - -/// Errors that can be raised by [`Args::run`]. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - /// Failure due to `io::Error`. See [`io::Error`]. - Io(#[from] io::Error), - /// Failure due to a database initialization error. See [`db::Error`]. - Database(#[from] db::Error), -} diff --git a/src/cli/recanonicalize.rs b/src/cli/recanonicalize.rs deleted file mode 100644 index 9db5b77..0000000 --- a/src/cli/recanonicalize.rs +++ /dev/null @@ -1,86 +0,0 @@ -use sqlx::sqlite::SqlitePool; - -use crate::{app::App, db}; - -/// Command-line entry point for repairing canonical names in the `hi` database. -/// This command may be necessary after an upgrade, if the canonical forms of -/// names has changed. It will re-calculate the canonical form of each name in -/// the database, based on its display form, and store the results back to the -/// database. -/// -/// This is intended to be used as a Clap [Parser], to capture command-line -/// arguments for the `hi-recanonicalize` command: -/// -/// ```no_run -/// # use hi::cli::recanonicalize::Error; -/// # -/// # #[tokio::main] -/// # async fn main() -> Result<(), Error> { -/// use clap::Parser; -/// use hi::cli::recanonicalize::Args; -/// -/// let args = Args::parse(); -/// args.run().await?; -/// # Ok(()) -/// # } -/// ``` -#[derive(clap::Parser)] -#[command( - version, - about = "Recanonicalize names in the `hi` database.", - long_about = r#"Recanonicalize names in the `hi` database. - -The `hi` server must not be running while this command is run. - -The database at `--database-url` will also be created, or upgraded, automatically."# -)] -pub struct Args { - /// Sqlite URL or path for the `hi` database - #[arg(short, long, env, default_value = "sqlite://.hi")] - database_url: String, - - /// Sqlite URL or path for a backup of the `hi` database during upgrades - #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] - backup_database_url: String, -} - -impl Args { - /// Recanonicalizes the `hi` database, using the parsed configuation in - /// `self`. - /// - /// This will perform the following tasks: - /// - /// * Migrate the `hi` database (at `--database-url`). - /// * Recanonicalize names in the `login` and `channel` tables. - /// - /// # Errors - /// - /// Will return `Err` if the canonicalization or database upgrade processes - /// fail. The specific [`Error`] variant will expose the cause - /// of the failure. - pub async fn run(self) -> Result<(), Error> { - let pool = self.pool().await?; - - let app = App::from(pool); - app.logins().recanonicalize().await?; - app.channels().recanonicalize().await?; - - Ok(()) - } - - async fn pool(&self) -> Result { - db::prepare(&self.database_url, &self.backup_database_url).await - } -} - -/// Errors that can be raised by [`Args::run`]. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - // /// Failure due to `io::Error`. See [`io::Error`]. - // Io(#[from] io::Error), - /// Failure due to a database initialization error. See [`db::Error`]. - Database(#[from] db::Error), - /// Failure due to a data manipulation error. See [`sqlx::Error`]. - Sqlx(#[from] sqlx::Error), -} diff --git a/src/login/app.rs b/src/login/app.rs index 6da26e9..f458561 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,33 +1,21 @@ use sqlx::sqlite::SqlitePool; -use super::repo::Provider as _; - -#[cfg(test)] use super::{ create::{self, Create}, Login, Password, }; -#[cfg(test)] use crate::{clock::DateTime, event::Broadcaster, name::Name}; pub struct Logins<'a> { db: &'a SqlitePool, - #[cfg(test)] events: &'a Broadcaster, } impl<'a> Logins<'a> { - #[cfg(not(test))] - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } - } - - #[cfg(test)] pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } - #[cfg(test)] pub async fn create( &self, name: &Name, @@ -45,17 +33,8 @@ impl<'a> Logins<'a> { Ok(login.as_created()) } - - pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { - let mut tx = self.db.begin().await?; - tx.logins().recanonicalize().await?; - tx.commit().await?; - - Ok(()) - } } -#[cfg(test)] #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("invalid login name: {0}")] diff --git a/src/login/mod.rs b/src/login/mod.rs index 5a6d715..006fa0c 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,3 +1,4 @@ +#[cfg(test)] pub mod app; pub mod create; pub mod event; diff --git a/src/login/repo.rs b/src/login/repo.rs index a972304..9439a25 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -143,38 +143,6 @@ impl<'c> Logins<'c> { Ok(logins) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let logins = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from login - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for login in logins { - let name = Name::from(login.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update login - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - login.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..d0830ff --- /dev/null +++ b/src/main.rs @@ -0,0 +1,9 @@ +use clap::Parser; + +use hi::cli; + +#[tokio::main] +async fn main() -> Result<(), cli::Error> { + let args = cli::Args::parse(); + args.run().await +} -- cgit v1.2.3 From 50a382528288248381b07c25719cbc9a519b4c81 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 02:01:31 -0400 Subject: Resume points are no longer optional. This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 ++++++++++ ...1314f70d13e8b0ca22b7652f1063ec7796cf307269.json | 62 ----------- ...9da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json | 44 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 +++++++++++ ...1602b8befafe751e9caeb0d5f279731e04c6925f46.json | 44 -------- ...c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json | 56 ---------- ...4730699683e63ae1ea404418a17c6af60148f03fe8.json | 44 -------- ...932ace995f53efd6c7800a7a1b42daec41d081b3d2.json | 44 ++++++++ ...0599914331682d58ace57214bfa26ccaa089592a24.json | 62 ----------- ...84a27bdaefca99706a0c73c17f19cc537f3f669882.json | 62 +++++++++++ docs/api/events.md | 2 +- src/boot/app.rs | 4 +- src/channel/history.rs | 6 +- src/channel/repo.rs | 11 +- src/channel/routes/channel/test/post.rs | 3 +- src/channel/routes/test.rs | 3 +- src/event/app.rs | 9 +- src/event/mod.rs | 2 - src/event/routes/get.rs | 10 +- src/event/routes/test/channel.rs | 91 +++++++++++----- src/event/routes/test/invite.rs | 26 +++-- src/event/routes/test/message.rs | 115 +++++++++++++++------ src/event/routes/test/resume.rs | 12 ++- src/event/routes/test/setup.rs | 13 ++- src/event/routes/test/token.rs | 19 ++-- src/event/sequence.rs | 9 +- src/login/history.rs | 6 +- src/login/repo.rs | 10 +- src/message/history.rs | 6 +- src/message/repo.rs | 21 ++-- src/test/fixtures/boot.rs | 9 ++ src/test/fixtures/mod.rs | 1 + 32 files changed, 519 insertions(+), 405 deletions(-) create mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json create mode 100644 .sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json create mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json delete mode 100644 .sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json delete mode 100644 .sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json delete mode 100644 .sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json create mode 100644 .sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json delete mode 100644 .sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json create mode 100644 .sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json create mode 100644 src/test/fixtures/boot.rs (limited to 'src/login/repo.rs') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json new file mode 100644 index 0000000..1d8a2e1 --- /dev/null +++ b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + true, + false, + false, + true, + true + ] + }, + "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" +} diff --git a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json b/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json deleted file mode 100644 index 9f09a28..0000000 --- a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269" -} diff --git a/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json new file mode 100644 index 0000000..ae546ad --- /dev/null +++ b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where login.created_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json new file mode 100644 index 0000000..5423bfd --- /dev/null +++ b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true, + true + ] + }, + "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" +} diff --git a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json b/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json deleted file mode 100644 index 2d1f49e..0000000 --- a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by canonical_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46" -} diff --git a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json b/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json deleted file mode 100644 index 436e1dd..0000000 --- a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name: String\",\n name.canonical_name as \"canonical_name: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where coalesce(channel.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd" -} diff --git a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json b/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json deleted file mode 100644 index 7d9bbbc..0000000 --- a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8" -} diff --git a/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json new file mode 100644 index 0000000..9c3c10e --- /dev/null +++ b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where created_sequence <= $1\n order by canonical_name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2" +} diff --git a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json b/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json deleted file mode 100644 index 7aab764..0000000 --- a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "channel: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - true, - true - ] - }, - "hash": "fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24" -} diff --git a/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json new file mode 100644 index 0000000..f38f49c --- /dev/null +++ b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882" +} diff --git a/docs/api/events.md b/docs/api/events.md index b08e971..b23469c 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -46,7 +46,7 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint accepts an optional `resume_point` (integer) query parameter. When provided, the event stream will include events published after that point in time; the value must be the value obtained by calling the [`GET /api/boot`](./boot.md) method. If absent, the returned event stream includes all events. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers diff --git a/src/boot/app.rs b/src/boot/app.rs index e716b58..909f7d8 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -22,9 +22,9 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let logins = tx.logins().all(resume_point.into()).await?; + let logins = tx.logins().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index dda7bb9..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -28,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index f47e564..7206c21 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -144,13 +144,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +160,8 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.created_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index f5369fb..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) diff --git a/src/event/app.rs b/src/event/app.rs index c754388..b309245 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, @@ -26,9 +26,8 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: impl Into, + resume_at: Sequence, ) -> Result + std::fmt::Debug, Error> { - let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -63,7 +62,7 @@ impl<'a> Events<'a> { .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); - let resume_live_at = replay_events.last().map(Sequenced::sequence); + let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); @@ -77,7 +76,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready { + fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 69c7a10..9996916 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -13,8 +13,6 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; -pub type ResumePoint = Option; - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 22e8762..ceebcc9 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _}, + event::{app, extract::LastEventId, Event, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -22,9 +22,7 @@ pub async fn handler( last_event_id: Option>, Query(query): Query, ) -> Result + std::fmt::Debug>, Error> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; @@ -32,9 +30,9 @@ pub async fn handler( Ok(Response(stream)) } -#[derive(Default, serde::Deserialize)] +#[derive(serde::Deserialize)] pub struct QueryParams { - pub resume_point: ResumePoint, + pub resume_point: Sequence, } #[derive(Debug)] diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index 6a0a803..0695ab1 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -12,14 +12,19 @@ async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Create a channel @@ -46,6 +51,7 @@ async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Create a channel @@ -59,10 +65,14 @@ async fn previously_created() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify channel created event @@ -81,14 +91,19 @@ async fn expiring() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire channels @@ -113,6 +128,7 @@ async fn previously_expired() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire channels @@ -124,10 +140,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -145,14 +165,19 @@ async fn deleting() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the channel @@ -177,6 +202,7 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the channel @@ -188,10 +214,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -209,6 +239,7 @@ async fn previously_purged() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the channel @@ -225,10 +256,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event events diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index d24f474..73af62d 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -14,14 +14,19 @@ async fn accepting_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Accept the invite @@ -50,6 +55,7 @@ async fn previously_accepted_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Accept the invite @@ -63,10 +69,14 @@ async fn previously_accepted_invite() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index a7b25fb..fafaeb3 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -16,14 +16,19 @@ async fn sending() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Send a message @@ -56,6 +61,7 @@ async fn previously_sent() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -74,10 +80,14 @@ async fn previously_sent() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify that an event is delivered @@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -141,6 +157,7 @@ async fn sent_sequentially() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -151,9 +168,14 @@ async fn sent_sequentially() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the expected events in the expected order @@ -180,14 +202,19 @@ async fn expiring() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire messages @@ -214,6 +241,7 @@ async fn previously_expired() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -225,10 +253,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -248,14 +280,19 @@ async fn deleting() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the message @@ -282,6 +319,7 @@ async fn previously_deleted() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -293,10 +331,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event let _ = events @@ -316,6 +358,7 @@ async fn previously_purged() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message @@ -332,10 +375,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index 62b9bad..fabda0c 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -16,6 +16,7 @@ async fn resume() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -34,7 +35,7 @@ async fn resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -55,7 +56,7 @@ async fn resume() { State(app), subscriber, Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -98,6 +99,7 @@ async fn serial_resume() { let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -115,7 +117,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -156,7 +158,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -197,7 +199,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 007b03d..26b7ea7 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -15,6 +15,7 @@ async fn previously_completed() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Complete initial setup @@ -28,10 +29,14 @@ async fn previously_completed() { // Subscribe to events let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 16ac7c3..fa76865 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -56,6 +61,7 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -65,7 +71,7 @@ async fn terminates_on_logout() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -101,6 +107,7 @@ async fn terminates_on_password_change() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -112,7 +119,7 @@ async fn terminates_on_password_change() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9bc399b..77281c2 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,6 +1,5 @@ use std::fmt; -use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -51,18 +50,18 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + move |event| event.sequence() <= resume_point } - pub fn after(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn after(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point < Some(event.sequence()) + move |event| resume_point < event.sequence() } pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool diff --git a/src/login/history.rs b/src/login/history.rs index daad579..8161b0b 100644 --- a/src/login/history.rs +++ b/src/login/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Event}, Id, Login, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,9 +24,9 @@ impl History { self.login.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/login/repo.rs b/src/login/repo.rs index 9439a25..1c63a4b 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; @@ -81,7 +81,7 @@ impl<'c> Logins<'c> { Ok(()) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -91,7 +91,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(created_sequence <= $1, true) + where created_sequence <= $1 order by canonical_name "#, resume_at, @@ -113,7 +113,7 @@ impl<'c> Logins<'c> { Ok(logins) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -123,7 +123,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(login.created_sequence > $1, true) + where login.created_sequence > $1 "#, resume_at, ) diff --git a/src/message/history.rs b/src/message/history.rs index 67e437a..ed8f5df 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -27,9 +27,9 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/message/repo.rs b/src/message/repo.rs index c8ceceb..913135c 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{self, Login}, }; @@ -106,22 +106,22 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", - id as "id: Id", + message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence <= $2, true) + where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, @@ -282,7 +282,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select @@ -292,12 +292,13 @@ impl<'c> Messages<'c> { message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence > $1, true) + where message.sent_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs new file mode 100644 index 0000000..120726f --- /dev/null +++ b/src/test/fixtures/boot.rs @@ -0,0 +1,9 @@ +use crate::{app::App, event::Sequence}; + +pub async fn resume_point(app: &App) -> Sequence { + app.boot() + .snapshot() + .await + .expect("boot always succeeds") + .resume_point +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..470b31a 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; +pub mod boot; pub mod channel; pub mod cookie; pub mod event; -- cgit v1.2.3