From 9010c7feeca8f4e7e501ad474911deaaf7a1a367 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 29 Oct 2024 20:44:03 -0400 Subject: Restrict channel names, too. Thankfully, channel creation only happens in one place, so we don't need a state machine for this. --- src/channel/app.rs | 8 +++++++- src/channel/mod.rs | 1 + src/channel/routes/post.rs | 3 +++ src/channel/routes/test.rs | 24 ++++++++++++++++++++++++ src/channel/validate.rs | 23 +++++++++++++++++++++++ 5 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 src/channel/validate.rs (limited to 'src/channel') diff --git a/src/channel/app.rs b/src/channel/app.rs index 8359277..9a19b16 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use sqlx::sqlite::SqlitePool; use super::{ repo::{LoadError, Provider as _}, - Channel, Id, + validate, Channel, Id, }; use crate::{ clock::DateTime, @@ -25,6 +25,10 @@ impl<'a> Channels<'a> { } pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx @@ -149,6 +153,8 @@ impl<'a> Channels<'a> { pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(Name), + #[error("invalid channel name: {0}")] + InvalidName(Name), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] diff --git a/src/channel/mod.rs b/src/channel/mod.rs index eb8200b..d5ba828 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -5,5 +5,6 @@ mod id; pub mod repo; mod routes; mod snapshot; +mod validate; pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs index 810445c..2cf1cc0 100644 --- a/src/channel/routes/post.rs +++ b/src/channel/routes/post.rs @@ -54,6 +54,9 @@ impl IntoResponse for Error { app::CreateError::DuplicateName(_) => { (StatusCode::CONFLICT, error.to_string()).into_response() } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 10b1e8d..f5369fb 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -115,6 +115,30 @@ async fn conflicting_canonical_name() { )); } +#[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::channel::propose_invalid_name(); + let request = post::Request { name: name.clone() }; + let post::Error(error) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("invalid channel name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + #[tokio::test] async fn name_reusable_after_delete() { // Set up the environment diff --git a/src/channel/validate.rs b/src/channel/validate.rs new file mode 100644 index 0000000..0c97293 --- /dev/null +++ b/src/channel/validate.rs @@ -0,0 +1,23 @@ +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().all(|ch| !ch.is_control()), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} -- cgit v1.2.3 From 36e659e971d091cfcbe370f5e45a0d01102d2e83 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 01:07:12 -0400 Subject: Prevent deletion of non-empty channels. --- docs/api/channels-messages.md | 5 +++- src/channel/app.rs | 49 ++++++++++++++++++++++--------- src/channel/routes/channel/delete.rs | 9 ++++-- src/channel/routes/channel/test/delete.rs | 34 ++++++++++++++++++--- 4 files changed, 76 insertions(+), 21 deletions(-) (limited to 'src/channel') diff --git a/docs/api/channels-messages.md b/docs/api/channels-messages.md index d87a01c..2aa8ac5 100644 --- a/docs/api/channels-messages.md +++ b/docs/api/channels-messages.md @@ -164,7 +164,7 @@ This endpoint will respond with a status of `404 Not Found` if the channel ID is Deletes a channel. -Deleting a channel prevents it from receiving any further messages, and deletes the messages it contains at that point. +Deleting a channel prevents it from receiving any further messages. The channel must be empty; to delete a channel with messages in it, delete the messages first (or wait for them to expire). This endpoint requires the following path parameter: @@ -190,6 +190,9 @@ The response will have the following fields: When completed, the service will emit a [message deleted](events.md#message-deleted) event for each message in the channel, followed by a [channel deleted](events.md#channel-deleted) event with the channel's ID. +### Channel not empty + +This endpoint will respond with a status of `409 Conflict` if the channel contains messages. ### Invalid channel ID diff --git a/src/channel/app.rs b/src/channel/app.rs index 9a19b16..e32eb6c 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -10,7 +10,7 @@ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, event::{repo::Provider as _, Broadcaster, Event, Sequence}, - message::repo::Provider as _, + message::{self, repo::Provider as _}, name::{self, Name}, }; @@ -48,38 +48,36 @@ impl<'a> Channels<'a> { // it exists in the specific moment when you call it. pub async fn get(&self, channel: &Id) -> Result { let not_found = || Error::NotFound(channel.clone()); + let deleted = || Error::Deleted(channel.clone()); let mut tx = self.db.begin().await?; let channel = tx.channels().by_id(channel).await.not_found(not_found)?; tx.commit().await?; - channel.as_snapshot().ok_or_else(not_found) + channel.as_snapshot().ok_or_else(deleted) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::NotFound(channel.clone()))?; + .not_found(|| DeleteError::NotFound(channel.clone()))?; channel .as_snapshot() - .ok_or_else(|| Error::Deleted(channel.id().clone()))?; + .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; let mut events = Vec::new(); let messages = tx.messages().live(&channel).await?; - for message in messages { - let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; - events.extend( - message - .events() - .filter(Sequence::start_from(deleted.sequence)) - .map(Event::from), - ); + let has_messages = messages + .iter() + .map(message::History::as_snapshot) + .any(|message| message.is_some()); + if has_messages { + return Err(DeleteError::NotEmpty(channel.id().clone())); } let deleted = tx.sequence().next(deleted_at).await?; @@ -191,6 +189,29 @@ impl From for Error { } } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), + #[error("channel {0} not empty")] + NotEmpty(Id), + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error(transparent)] + Name(#[from] name::Error), +} + +impl From for DeleteError { + fn from(error: LoadError) -> Self { + match error { + LoadError::Database(error) => error.into(), + LoadError::Name(error) => error.into(), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum ExpireError { #[error(transparent)] diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs index 2d2b5f1..9c093c1 100644 --- a/src/channel/routes/channel/delete.rs +++ b/src/channel/routes/channel/delete.rs @@ -36,14 +36,19 @@ impl IntoResponse for Response { #[derive(Debug, thiserror::Error)] #[error(transparent)] -pub struct Error(#[from] pub app::Error); +pub struct Error(#[from] pub app::DeleteError); impl IntoResponse for Error { fn into_response(self) -> response::Response { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { - app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(), + app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => { + NotFound(error).into_response() + } + app::DeleteError::NotEmpty(_) => { + (StatusCode::CONFLICT, error.to_string()).into_response() + } other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs index 0371b0a..77a0b03 100644 --- a/src/channel/routes/channel/test/delete.rs +++ b/src/channel/routes/channel/test/delete.rs @@ -55,7 +55,7 @@ pub async fn invalid_channel_id() { // Verify the response - assert!(matches!(error, app::Error::NotFound(id) if id == channel)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel)); } #[tokio::test] @@ -84,7 +84,7 @@ pub async fn channel_deleted() { // Verify the response - assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); } #[tokio::test] @@ -113,7 +113,7 @@ pub async fn channel_expired() { // Verify the response - assert!(matches!(error, app::Error::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); } #[tokio::test] @@ -147,5 +147,31 @@ pub async fn channel_purged() { // Verify the response - assert!(matches!(error, app::Error::NotFound(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id)); +} + +#[tokio::test] +pub async fn channel_not_empty() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + + // Send the request + + let deleter = fixtures::identity::create(&app, &fixtures::now()).await; + let delete::Error(error) = delete::handler( + State(app.clone()), + Path(channel.id.clone()), + fixtures::now(), + deleter, + ) + .await + .expect_err("deleting a channel with messages fails"); + + // Verify the response + + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id)); } -- cgit v1.2.3 From ffba911ba5240f67cc616b2cc2eaf7c730ebbde8 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 01:14:02 -0400 Subject: Avoid hard-coding the assumption that delete comes-after create. I mean, it always does, but I'd rather get a panic during message/channel reconstruction than wrong results if that assumption is ever violated inadvertently. --- src/channel/history.rs | 6 +++++- src/message/history.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) (limited to 'src/channel') diff --git a/src/channel/history.rs b/src/channel/history.rs index 4b9fcc7..dda7bb9 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -1,3 +1,5 @@ +use itertools::Itertools as _; + use super::{ event::{Created, Deleted, Event}, Channel, Id, @@ -41,7 +43,9 @@ impl History { // Event factories impl History { pub fn events(&self) -> impl Iterator { - [self.created()].into_iter().chain(self.deleted()) + [self.created()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) } fn created(&self) -> Event { diff --git a/src/message/history.rs b/src/message/history.rs index 0424d0d..67e437a 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -1,3 +1,5 @@ +use itertools::Itertools as _; + use super::{ event::{Deleted, Event, Sent}, Id, Message, @@ -57,6 +59,8 @@ impl History { } pub fn events(&self) -> impl Iterator { - [self.sent()].into_iter().chain(self.deleted()) + [self.sent()] + .into_iter() + .merge_by(self.deleted(), Sequence::merge) } } -- cgit v1.2.3 From 70591c5ac10069a4ae649bd6f79d769da9e32a98 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 01:25:04 -0400 Subject: Remove `hi-recanonicalize`. This utility was needed to support a database migration with existing data. I have it on good authority that no further databases exist that are in the state that made this tool necessary. --- ...29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json | 12 -- ...16293ea1bcc4913987ee751951e9d2f31bf495f305.json | 26 ---- ...1241dc35263ccfee9f3424111e7fa6014071f98a1e.json | 26 ---- ...42bfc6c82b1464afa98845e537e850d05deb328f06.json | 12 -- Cargo.toml | 1 - docs/internal-server-errors.md | 19 --- src/app.rs | 9 +- src/bin/hi-recanonicalize.rs | 9 -- src/bin/hi.rs | 9 -- src/channel/app.rs | 8 - src/channel/repo.rs | 32 ---- src/cli.rs | 170 ++++++++++++++++++++ src/cli/mod.rs | 172 --------------------- src/cli/recanonicalize.rs | 86 ----------- src/login/app.rs | 21 --- src/login/mod.rs | 1 + src/login/repo.rs | 32 ---- src/main.rs | 9 ++ 18 files changed, 183 insertions(+), 471 deletions(-) delete mode 100644 .sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json delete mode 100644 .sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json delete mode 100644 .sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json delete mode 100644 .sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json delete mode 100644 src/bin/hi-recanonicalize.rs delete mode 100644 src/bin/hi.rs create mode 100644 src/cli.rs delete mode 100644 src/cli/mod.rs delete mode 100644 src/cli/recanonicalize.rs create mode 100644 src/main.rs (limited to 'src/channel') diff --git a/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json b/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json deleted file mode 100644 index 1105391..0000000 --- a/.sqlx/query-31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update channel_name\n set canonical_name = $1\n where id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "31e741181f0d09540063ef29117e7e70d3df2beaa1b1e2e081b0d362c07ceae8" -} diff --git a/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json b/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json deleted file mode 100644 index be5b784..0000000 --- a/.sqlx/query-642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\"\n from channel_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "642fb12657410a4bee58d316293ea1bcc4913987ee751951e9d2f31bf495f305" -} diff --git a/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json b/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json deleted file mode 100644 index fd601e9..0000000 --- a/.sqlx/query-676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\"\n from login\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [ - false, - false - ] - }, - "hash": "676a7dda6314cae4d13ff51241dc35263ccfee9f3424111e7fa6014071f98a1e" -} diff --git a/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json b/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json deleted file mode 100644 index 677495b..0000000 --- a/.sqlx/query-b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update login\n set canonical_name = $1\n where id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "b67d56f20dab413e31a64842bfc6c82b1464afa98845e537e850d05deb328f06" -} diff --git a/Cargo.toml b/Cargo.toml index c8b37e1..83c3aa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ maintainer-scripts = "debian" assets = [ # Binaries ["target/release/hi", "/usr/bin/hi", "755"], - ["target/release/hi-recanonicalize", "/usr/bin/hi-recanonicalize", "755"], # Configuration ["debian/default", "/etc/default/hi", "644"], diff --git a/docs/internal-server-errors.md b/docs/internal-server-errors.md index 4f679b7..16d61a2 100644 --- a/docs/internal-server-errors.md +++ b/docs/internal-server-errors.md @@ -9,22 +9,3 @@ The server attempted two write transactions at the same time, and encountered [s This error will almost always resolve itself if clients re-try their requests; no further action is needed. This is a known issue. If you are encountering this consistently (or if you can trigger it on demand), let us know. We are aware of sqlite's features for mitigating this issue but have been unsuccessful in applying them; we're working on it, but patches _are_ welcome, if you have the opportunity. - -## stored canonical form […] does not match computed canonical form […] for name […] - -When `hi` applies the `migrations/20241019191531_canonical_names.sql` migration (from commit `3f9648eed48cd8b6cd35d0ae2ee5bbe25fa735ac`), this can leave existing names in a state where the stored canonical form is not the correct canonicalization of the stored display names of channels and logins. `hi` will abort requests when it encounters this situation, to avoid incorrect behaviours such as duplicate channels or duplicate logins. - -As channel and login names may be presented during client startup, this can render the service unusable until repaired. Treat this as an immediate outage if you see it. - -You can verify that login names are unique by running the following commands as the user the `hi` server runs as: - -* `sqlite3 .hi 'select display_name from login'` -* `sqlite3 .hi 'select display_name from channel_name'` - -Substitute `.hi` with the path to your `hi` database if it differs from the default. - -If the names are unique, you can repair the database: - -* Stop the `hi` server. -* Run `hi-recanonicalize`, as the same user the `hi` server runs as, with the same database options. -* Start the `hi` server. diff --git a/src/app.rs b/src/app.rs index bc1daa5..0dbf017 100644 --- a/src/app.rs +++ b/src/app.rs @@ -5,12 +5,14 @@ use crate::{ channel::app::Channels, event::{self, app::Events}, invite::app::Invites, - login::app::Logins, message::app::Messages, setup::app::Setup, token::{self, app::Tokens}, }; +#[cfg(test)] +use crate::login::app::Logins; + #[derive(Clone)] pub struct App { db: SqlitePool, @@ -47,11 +49,6 @@ impl App { Invites::new(&self.db, &self.events) } - #[cfg(not(test))] - pub const fn logins(&self) -> Logins { - Logins::new(&self.db) - } - #[cfg(test)] pub const fn logins(&self) -> Logins { Logins::new(&self.db, &self.events) diff --git a/src/bin/hi-recanonicalize.rs b/src/bin/hi-recanonicalize.rs deleted file mode 100644 index 4081276..0000000 --- a/src/bin/hi-recanonicalize.rs +++ /dev/null @@ -1,9 +0,0 @@ -use clap::Parser; - -use hi::cli; - -#[tokio::main] -async fn main() -> Result<(), cli::recanonicalize::Error> { - let args = cli::recanonicalize::Args::parse(); - args.run().await -} diff --git a/src/bin/hi.rs b/src/bin/hi.rs deleted file mode 100644 index d0830ff..0000000 --- a/src/bin/hi.rs +++ /dev/null @@ -1,9 +0,0 @@ -use clap::Parser; - -use hi::cli; - -#[tokio::main] -async fn main() -> Result<(), cli::Error> { - let args = cli::Args::parse(); - args.run().await -} diff --git a/src/channel/app.rs b/src/channel/app.rs index e32eb6c..21784e9 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -137,14 +137,6 @@ impl<'a> Channels<'a> { Ok(()) } - - pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { - let mut tx = self.db.begin().await?; - tx.channels().recanonicalize().await?; - tx.commit().await?; - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index a49db52..f47e564 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -300,38 +300,6 @@ impl<'c> Channels<'c> { Ok(channels) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let channels = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from channel_name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for channel in channels { - let name = Name::from(channel.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update channel_name - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - channel.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..0659851 --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,170 @@ +//! The `hi` command-line interface. +//! +//! This module supports running `hi` as a freestanding program, via the +//! [`Args`] struct. + +use std::{future, io}; + +use axum::{ + http::header, + middleware, + response::{IntoResponse, Response}, + Router, +}; +use clap::{CommandFactory, Parser}; +use sqlx::sqlite::SqlitePool; +use tokio::net; + +use crate::{ + app::App, + boot, channel, clock, db, event, expire, invite, login, message, + setup::{self, middleware::setup_required}, + ui, +}; + +/// Command-line entry point for running the `hi` server. +/// +/// This is intended to be used as a Clap [Parser], to capture command-line +/// arguments for the `hi` server: +/// +/// ```no_run +/// # use hi::cli::Error; +/// # +/// # #[tokio::main] +/// # async fn main() -> Result<(), Error> { +/// use clap::Parser; +/// use hi::cli::Args; +/// +/// let args = Args::parse(); +/// args.run().await?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Parser)] +#[command( + version, + about = "Run the `hi` server.", + long_about = r#"Run the `hi` server. + +The database at `--database-url` will be created, or upgraded, automatically."# +)] +pub struct Args { + /// The network address `hi` should listen on + #[arg(short, long, env, default_value = "localhost")] + address: String, + + /// The network port `hi` should listen on + #[arg(short, long, env, default_value_t = 64209)] + port: u16, + + /// Sqlite URL or path for the `hi` database + #[arg(short, long, env, default_value = "sqlite://.hi")] + database_url: String, + + /// Sqlite URL or path for a backup of the `hi` database during upgrades + #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] + backup_database_url: String, +} + +impl Args { + /// Runs the `hi` server, using the parsed configuation in `self`. + /// + /// This will perform the following tasks: + /// + /// * Migrate the `hi` database (at `--database-url`). + /// * Start an HTTP server (on the interface and port controlled by + /// `--address` and `--port`). + /// * Print a status message. + /// * Wait for that server to shut down. + /// + /// # Errors + /// + /// Will return `Err` if the server is unable to start, or terminates + /// prematurely. The specific [`Error`] variant will expose the cause + /// of the failure. + pub async fn run(self) -> Result<(), Error> { + let pool = self.pool().await?; + + let app = App::from(pool); + let app = routers(&app) + .route_layer(middleware::from_fn_with_state( + app.clone(), + expire::middleware, + )) + .route_layer(middleware::from_fn(clock::middleware)) + .route_layer(middleware::map_response(Self::server_info())) + .with_state(app); + + let listener = self.listener().await?; + let started_msg = started_msg(&listener)?; + + let serve = axum::serve(listener, app); + println!("{started_msg}"); + serve.await?; + + Ok(()) + } + + async fn listener(&self) -> io::Result { + let listen_addr = self.listen_addr(); + let listener = tokio::net::TcpListener::bind(listen_addr).await?; + Ok(listener) + } + + fn listen_addr(&self) -> impl net::ToSocketAddrs + '_ { + (self.address.as_str(), self.port) + } + + async fn pool(&self) -> Result { + db::prepare(&self.database_url, &self.backup_database_url).await + } + + fn server_info() -> impl Clone + Fn(Response) -> future::Ready { + let command = Self::command(); + let name = command.get_name(); + let version = command.get_version().unwrap_or("unknown version"); + let version = format!("{name}/{version}"); + move |resp| { + let response = ([(header::SERVER, &version)], resp).into_response(); + future::ready(response) + } + } +} + +fn routers(app: &App) -> Router { + [ + [ + // API endpoints that require setup to function + boot::router(), + channel::router(), + event::router(), + invite::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) + .route_layer(middleware::from_fn_with_state(app.clone(), setup_required)), + // API endpoints that handle setup + setup::router(), + // The UI (handles setup state itself) + ui::router(app), + ] + .into_iter() + .fold(Router::default(), Router::merge) +} + +fn started_msg(listener: &net::TcpListener) -> io::Result { + let local_addr = listener.local_addr()?; + Ok(format!("listening on http://{local_addr}/")) +} + +/// Errors that can be raised by [`Args::run`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub enum Error { + /// Failure due to `io::Error`. See [`io::Error`]. + Io(#[from] io::Error), + /// Failure due to a database initialization error. See [`db::Error`]. + Database(#[from] db::Error), +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs deleted file mode 100644 index c75ce2b..0000000 --- a/src/cli/mod.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! The `hi` command-line interface. -//! -//! This module supports running `hi` as a freestanding program, via the -//! [`Args`] struct. - -use std::{future, io}; - -use axum::{ - http::header, - middleware, - response::{IntoResponse, Response}, - Router, -}; -use clap::{CommandFactory, Parser}; -use sqlx::sqlite::SqlitePool; -use tokio::net; - -use crate::{ - app::App, - boot, channel, clock, db, event, expire, invite, login, message, - setup::{self, middleware::setup_required}, - ui, -}; - -pub mod recanonicalize; - -/// Command-line entry point for running the `hi` server. -/// -/// This is intended to be used as a Clap [Parser], to capture command-line -/// arguments for the `hi` server: -/// -/// ```no_run -/// # use hi::cli::Error; -/// # -/// # #[tokio::main] -/// # async fn main() -> Result<(), Error> { -/// use clap::Parser; -/// use hi::cli::Args; -/// -/// let args = Args::parse(); -/// args.run().await?; -/// # Ok(()) -/// # } -/// ``` -#[derive(Parser)] -#[command( - version, - about = "Run the `hi` server.", - long_about = r#"Run the `hi` server. - -The database at `--database-url` will be created, or upgraded, automatically."# -)] -pub struct Args { - /// The network address `hi` should listen on - #[arg(short, long, env, default_value = "localhost")] - address: String, - - /// The network port `hi` should listen on - #[arg(short, long, env, default_value_t = 64209)] - port: u16, - - /// Sqlite URL or path for the `hi` database - #[arg(short, long, env, default_value = "sqlite://.hi")] - database_url: String, - - /// Sqlite URL or path for a backup of the `hi` database during upgrades - #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] - backup_database_url: String, -} - -impl Args { - /// Runs the `hi` server, using the parsed configuation in `self`. - /// - /// This will perform the following tasks: - /// - /// * Migrate the `hi` database (at `--database-url`). - /// * Start an HTTP server (on the interface and port controlled by - /// `--address` and `--port`). - /// * Print a status message. - /// * Wait for that server to shut down. - /// - /// # Errors - /// - /// Will return `Err` if the server is unable to start, or terminates - /// prematurely. The specific [`Error`] variant will expose the cause - /// of the failure. - pub async fn run(self) -> Result<(), Error> { - let pool = self.pool().await?; - - let app = App::from(pool); - let app = routers(&app) - .route_layer(middleware::from_fn_with_state( - app.clone(), - expire::middleware, - )) - .route_layer(middleware::from_fn(clock::middleware)) - .route_layer(middleware::map_response(Self::server_info())) - .with_state(app); - - let listener = self.listener().await?; - let started_msg = started_msg(&listener)?; - - let serve = axum::serve(listener, app); - println!("{started_msg}"); - serve.await?; - - Ok(()) - } - - async fn listener(&self) -> io::Result { - let listen_addr = self.listen_addr(); - let listener = tokio::net::TcpListener::bind(listen_addr).await?; - Ok(listener) - } - - fn listen_addr(&self) -> impl net::ToSocketAddrs + '_ { - (self.address.as_str(), self.port) - } - - async fn pool(&self) -> Result { - db::prepare(&self.database_url, &self.backup_database_url).await - } - - fn server_info() -> impl Clone + Fn(Response) -> future::Ready { - let command = Self::command(); - let name = command.get_name(); - let version = command.get_version().unwrap_or("unknown version"); - let version = format!("{name}/{version}"); - move |resp| { - let response = ([(header::SERVER, &version)], resp).into_response(); - future::ready(response) - } - } -} - -fn routers(app: &App) -> Router { - [ - [ - // API endpoints that require setup to function - boot::router(), - channel::router(), - event::router(), - invite::router(), - login::router(), - message::router(), - ] - .into_iter() - .fold(Router::default(), Router::merge) - .route_layer(middleware::from_fn_with_state(app.clone(), setup_required)), - // API endpoints that handle setup - setup::router(), - // The UI (handles setup state itself) - ui::router(app), - ] - .into_iter() - .fold(Router::default(), Router::merge) -} - -fn started_msg(listener: &net::TcpListener) -> io::Result { - let local_addr = listener.local_addr()?; - Ok(format!("listening on http://{local_addr}/")) -} - -/// Errors that can be raised by [`Args::run`]. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - /// Failure due to `io::Error`. See [`io::Error`]. - Io(#[from] io::Error), - /// Failure due to a database initialization error. See [`db::Error`]. - Database(#[from] db::Error), -} diff --git a/src/cli/recanonicalize.rs b/src/cli/recanonicalize.rs deleted file mode 100644 index 9db5b77..0000000 --- a/src/cli/recanonicalize.rs +++ /dev/null @@ -1,86 +0,0 @@ -use sqlx::sqlite::SqlitePool; - -use crate::{app::App, db}; - -/// Command-line entry point for repairing canonical names in the `hi` database. -/// This command may be necessary after an upgrade, if the canonical forms of -/// names has changed. It will re-calculate the canonical form of each name in -/// the database, based on its display form, and store the results back to the -/// database. -/// -/// This is intended to be used as a Clap [Parser], to capture command-line -/// arguments for the `hi-recanonicalize` command: -/// -/// ```no_run -/// # use hi::cli::recanonicalize::Error; -/// # -/// # #[tokio::main] -/// # async fn main() -> Result<(), Error> { -/// use clap::Parser; -/// use hi::cli::recanonicalize::Args; -/// -/// let args = Args::parse(); -/// args.run().await?; -/// # Ok(()) -/// # } -/// ``` -#[derive(clap::Parser)] -#[command( - version, - about = "Recanonicalize names in the `hi` database.", - long_about = r#"Recanonicalize names in the `hi` database. - -The `hi` server must not be running while this command is run. - -The database at `--database-url` will also be created, or upgraded, automatically."# -)] -pub struct Args { - /// Sqlite URL or path for the `hi` database - #[arg(short, long, env, default_value = "sqlite://.hi")] - database_url: String, - - /// Sqlite URL or path for a backup of the `hi` database during upgrades - #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] - backup_database_url: String, -} - -impl Args { - /// Recanonicalizes the `hi` database, using the parsed configuation in - /// `self`. - /// - /// This will perform the following tasks: - /// - /// * Migrate the `hi` database (at `--database-url`). - /// * Recanonicalize names in the `login` and `channel` tables. - /// - /// # Errors - /// - /// Will return `Err` if the canonicalization or database upgrade processes - /// fail. The specific [`Error`] variant will expose the cause - /// of the failure. - pub async fn run(self) -> Result<(), Error> { - let pool = self.pool().await?; - - let app = App::from(pool); - app.logins().recanonicalize().await?; - app.channels().recanonicalize().await?; - - Ok(()) - } - - async fn pool(&self) -> Result { - db::prepare(&self.database_url, &self.backup_database_url).await - } -} - -/// Errors that can be raised by [`Args::run`]. -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - // /// Failure due to `io::Error`. See [`io::Error`]. - // Io(#[from] io::Error), - /// Failure due to a database initialization error. See [`db::Error`]. - Database(#[from] db::Error), - /// Failure due to a data manipulation error. See [`sqlx::Error`]. - Sqlx(#[from] sqlx::Error), -} diff --git a/src/login/app.rs b/src/login/app.rs index 6da26e9..f458561 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,33 +1,21 @@ use sqlx::sqlite::SqlitePool; -use super::repo::Provider as _; - -#[cfg(test)] use super::{ create::{self, Create}, Login, Password, }; -#[cfg(test)] use crate::{clock::DateTime, event::Broadcaster, name::Name}; pub struct Logins<'a> { db: &'a SqlitePool, - #[cfg(test)] events: &'a Broadcaster, } impl<'a> Logins<'a> { - #[cfg(not(test))] - pub const fn new(db: &'a SqlitePool) -> Self { - Self { db } - } - - #[cfg(test)] pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } - #[cfg(test)] pub async fn create( &self, name: &Name, @@ -45,17 +33,8 @@ impl<'a> Logins<'a> { Ok(login.as_created()) } - - pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> { - let mut tx = self.db.begin().await?; - tx.logins().recanonicalize().await?; - tx.commit().await?; - - Ok(()) - } } -#[cfg(test)] #[derive(Debug, thiserror::Error)] pub enum CreateError { #[error("invalid login name: {0}")] diff --git a/src/login/mod.rs b/src/login/mod.rs index 5a6d715..006fa0c 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,3 +1,4 @@ +#[cfg(test)] pub mod app; pub mod create; pub mod event; diff --git a/src/login/repo.rs b/src/login/repo.rs index a972304..9439a25 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -143,38 +143,6 @@ impl<'c> Logins<'c> { Ok(logins) } - - pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { - let logins = sqlx::query!( - r#" - select - id as "id: Id", - display_name as "display_name: String" - from login - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - for login in logins { - let name = Name::from(login.display_name); - let canonical_name = name.canonical(); - - sqlx::query!( - r#" - update login - set canonical_name = $1 - where id = $2 - "#, - canonical_name, - login.id, - ) - .execute(&mut *self.0) - .await?; - } - - Ok(()) - } } #[derive(Debug, thiserror::Error)] diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..d0830ff --- /dev/null +++ b/src/main.rs @@ -0,0 +1,9 @@ +use clap::Parser; + +use hi::cli; + +#[tokio::main] +async fn main() -> Result<(), cli::Error> { + let args = cli::Args::parse(); + args.run().await +} -- cgit v1.2.3 From 50a382528288248381b07c25719cbc9a519b4c81 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 02:01:31 -0400 Subject: Resume points are no longer optional. This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 ++++++++++ ...1314f70d13e8b0ca22b7652f1063ec7796cf307269.json | 62 ----------- ...9da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json | 44 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 +++++++++++ ...1602b8befafe751e9caeb0d5f279731e04c6925f46.json | 44 -------- ...c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json | 56 ---------- ...4730699683e63ae1ea404418a17c6af60148f03fe8.json | 44 -------- ...932ace995f53efd6c7800a7a1b42daec41d081b3d2.json | 44 ++++++++ ...0599914331682d58ace57214bfa26ccaa089592a24.json | 62 ----------- ...84a27bdaefca99706a0c73c17f19cc537f3f669882.json | 62 +++++++++++ docs/api/events.md | 2 +- src/boot/app.rs | 4 +- src/channel/history.rs | 6 +- src/channel/repo.rs | 11 +- src/channel/routes/channel/test/post.rs | 3 +- src/channel/routes/test.rs | 3 +- src/event/app.rs | 9 +- src/event/mod.rs | 2 - src/event/routes/get.rs | 10 +- src/event/routes/test/channel.rs | 91 +++++++++++----- src/event/routes/test/invite.rs | 26 +++-- src/event/routes/test/message.rs | 115 +++++++++++++++------ src/event/routes/test/resume.rs | 12 ++- src/event/routes/test/setup.rs | 13 ++- src/event/routes/test/token.rs | 19 ++-- src/event/sequence.rs | 9 +- src/login/history.rs | 6 +- src/login/repo.rs | 10 +- src/message/history.rs | 6 +- src/message/repo.rs | 21 ++-- src/test/fixtures/boot.rs | 9 ++ src/test/fixtures/mod.rs | 1 + 32 files changed, 519 insertions(+), 405 deletions(-) create mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json create mode 100644 .sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json create mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json delete mode 100644 .sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json delete mode 100644 .sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json delete mode 100644 .sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json create mode 100644 .sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json delete mode 100644 .sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json create mode 100644 .sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json create mode 100644 src/test/fixtures/boot.rs (limited to 'src/channel') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json new file mode 100644 index 0000000..1d8a2e1 --- /dev/null +++ b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + true, + false, + false, + true, + true + ] + }, + "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" +} diff --git a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json b/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json deleted file mode 100644 index 9f09a28..0000000 --- a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269" -} diff --git a/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json new file mode 100644 index 0000000..ae546ad --- /dev/null +++ b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where login.created_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json new file mode 100644 index 0000000..5423bfd --- /dev/null +++ b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true, + true + ] + }, + "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" +} diff --git a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json b/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json deleted file mode 100644 index 2d1f49e..0000000 --- a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by canonical_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46" -} diff --git a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json b/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json deleted file mode 100644 index 436e1dd..0000000 --- a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name: String\",\n name.canonical_name as \"canonical_name: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where coalesce(channel.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd" -} diff --git a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json b/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json deleted file mode 100644 index 7d9bbbc..0000000 --- a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8" -} diff --git a/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json new file mode 100644 index 0000000..9c3c10e --- /dev/null +++ b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where created_sequence <= $1\n order by canonical_name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2" +} diff --git a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json b/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json deleted file mode 100644 index 7aab764..0000000 --- a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "channel: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - true, - true - ] - }, - "hash": "fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24" -} diff --git a/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json new file mode 100644 index 0000000..f38f49c --- /dev/null +++ b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882" +} diff --git a/docs/api/events.md b/docs/api/events.md index b08e971..b23469c 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -46,7 +46,7 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint accepts an optional `resume_point` (integer) query parameter. When provided, the event stream will include events published after that point in time; the value must be the value obtained by calling the [`GET /api/boot`](./boot.md) method. If absent, the returned event stream includes all events. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers diff --git a/src/boot/app.rs b/src/boot/app.rs index e716b58..909f7d8 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -22,9 +22,9 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let logins = tx.logins().all(resume_point.into()).await?; + let logins = tx.logins().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index dda7bb9..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -28,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index f47e564..7206c21 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -144,13 +144,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +160,8 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.created_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index f5369fb..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) diff --git a/src/event/app.rs b/src/event/app.rs index c754388..b309245 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, @@ -26,9 +26,8 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: impl Into, + resume_at: Sequence, ) -> Result + std::fmt::Debug, Error> { - let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -63,7 +62,7 @@ impl<'a> Events<'a> { .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); - let resume_live_at = replay_events.last().map(Sequenced::sequence); + let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); @@ -77,7 +76,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready { + fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 69c7a10..9996916 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -13,8 +13,6 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; -pub type ResumePoint = Option; - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 22e8762..ceebcc9 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _}, + event::{app, extract::LastEventId, Event, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -22,9 +22,7 @@ pub async fn handler( last_event_id: Option>, Query(query): Query, ) -> Result + std::fmt::Debug>, Error> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; @@ -32,9 +30,9 @@ pub async fn handler( Ok(Response(stream)) } -#[derive(Default, serde::Deserialize)] +#[derive(serde::Deserialize)] pub struct QueryParams { - pub resume_point: ResumePoint, + pub resume_point: Sequence, } #[derive(Debug)] diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index 6a0a803..0695ab1 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -12,14 +12,19 @@ async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Create a channel @@ -46,6 +51,7 @@ async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Create a channel @@ -59,10 +65,14 @@ async fn previously_created() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify channel created event @@ -81,14 +91,19 @@ async fn expiring() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire channels @@ -113,6 +128,7 @@ async fn previously_expired() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire channels @@ -124,10 +140,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -145,14 +165,19 @@ async fn deleting() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the channel @@ -177,6 +202,7 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the channel @@ -188,10 +214,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -209,6 +239,7 @@ async fn previously_purged() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the channel @@ -225,10 +256,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event events diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index d24f474..73af62d 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -14,14 +14,19 @@ async fn accepting_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Accept the invite @@ -50,6 +55,7 @@ async fn previously_accepted_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Accept the invite @@ -63,10 +69,14 @@ async fn previously_accepted_invite() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index a7b25fb..fafaeb3 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -16,14 +16,19 @@ async fn sending() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Send a message @@ -56,6 +61,7 @@ async fn previously_sent() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -74,10 +80,14 @@ async fn previously_sent() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify that an event is delivered @@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -141,6 +157,7 @@ async fn sent_sequentially() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -151,9 +168,14 @@ async fn sent_sequentially() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the expected events in the expected order @@ -180,14 +202,19 @@ async fn expiring() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire messages @@ -214,6 +241,7 @@ async fn previously_expired() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -225,10 +253,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -248,14 +280,19 @@ async fn deleting() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the message @@ -282,6 +319,7 @@ async fn previously_deleted() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -293,10 +331,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event let _ = events @@ -316,6 +358,7 @@ async fn previously_purged() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message @@ -332,10 +375,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index 62b9bad..fabda0c 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -16,6 +16,7 @@ async fn resume() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -34,7 +35,7 @@ async fn resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -55,7 +56,7 @@ async fn resume() { State(app), subscriber, Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -98,6 +99,7 @@ async fn serial_resume() { let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -115,7 +117,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -156,7 +158,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -197,7 +199,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 007b03d..26b7ea7 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -15,6 +15,7 @@ async fn previously_completed() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Complete initial setup @@ -28,10 +29,14 @@ async fn previously_completed() { // Subscribe to events let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 16ac7c3..fa76865 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -56,6 +61,7 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -65,7 +71,7 @@ async fn terminates_on_logout() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -101,6 +107,7 @@ async fn terminates_on_password_change() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -112,7 +119,7 @@ async fn terminates_on_password_change() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9bc399b..77281c2 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,6 +1,5 @@ use std::fmt; -use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -51,18 +50,18 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + move |event| event.sequence() <= resume_point } - pub fn after(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn after(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point < Some(event.sequence()) + move |event| resume_point < event.sequence() } pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool diff --git a/src/login/history.rs b/src/login/history.rs index daad579..8161b0b 100644 --- a/src/login/history.rs +++ b/src/login/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Event}, Id, Login, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,9 +24,9 @@ impl History { self.login.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/login/repo.rs b/src/login/repo.rs index 9439a25..1c63a4b 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; @@ -81,7 +81,7 @@ impl<'c> Logins<'c> { Ok(()) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -91,7 +91,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(created_sequence <= $1, true) + where created_sequence <= $1 order by canonical_name "#, resume_at, @@ -113,7 +113,7 @@ impl<'c> Logins<'c> { Ok(logins) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -123,7 +123,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(login.created_sequence > $1, true) + where login.created_sequence > $1 "#, resume_at, ) diff --git a/src/message/history.rs b/src/message/history.rs index 67e437a..ed8f5df 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -27,9 +27,9 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/message/repo.rs b/src/message/repo.rs index c8ceceb..913135c 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{self, Login}, }; @@ -106,22 +106,22 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", - id as "id: Id", + message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence <= $2, true) + where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, @@ -282,7 +282,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select @@ -292,12 +292,13 @@ impl<'c> Messages<'c> { message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence > $1, true) + where message.sent_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs new file mode 100644 index 0000000..120726f --- /dev/null +++ b/src/test/fixtures/boot.rs @@ -0,0 +1,9 @@ +use crate::{app::App, event::Sequence}; + +pub async fn resume_point(app: &App) -> Sequence { + app.boot() + .snapshot() + .await + .expect("boot always succeeds") + .resume_point +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..470b31a 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; +pub mod boot; pub mod channel; pub mod cookie; pub mod event; -- cgit v1.2.3 From 06c839436900ce07ec5c53175b01f3c5011e507c Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 11:32:52 -0400 Subject: Track an index-friendly sequence range for both channels and messages. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is meant to limit the amount of messages that event replay needs to examine. Previously, the query required a table scan; table scans on `message` can be quite large, and should really be avoided. The new schema allows replays to be carried out using an index scan. The premise of this change is that, for each (channel, message), there is a range of event sequence numbers that the (channel, message) may appear in. We'll notate that as `[start, end]` in the general case, but they are: * for active channels, `[ch.created_sequence, ch.created_sequence]`. * for deleted channels, `[ch.created_sequence, ch_del.deleted_sequence]`. * for active messages, `[mg.sent_sequence, mg.sent_sequence]`. * for deleted messages, `[mg.sent_seqeunce, mg_del.deleted_sequence]`. (The two "active" ranges may grow in future releases, to support things like channel renames and message editing. That won't change the logic, but those two things will need to update the new `last_sequence` field.) There are two families of operations that need to retrieve based on these ranges: * Boot creates a snapshot as of a specific `resume_at` sequence number, and thus must include any record whose `start` falls on or before `resume_at`. We can't exclude records whose `end` is also before it, as their terminal state may be one that is included in boot (eg. active channels). * Event replay needs to include any events that fall after the same `resume_at`, and thus must include events from any record whose `end` falls strictly after `resume_at`. We can't exclude records whose `start` is also strictly after `resume_at`, as we'd omit them from replay, inappropriately, if we did. This gives three interesting cases: 1. Record fully after `resume_at`: event sequence --increasing--> x-a … x … x+k … resume_at start end This record should be omitted by boot, but included for event replay. 2. Record fully before `resume_at`: event sequence --increasing--> x … x+k … x+a start end resume_at This record should be omitted for event replay, but included for boot. 3. Record overlapping `resume_at`: event sequence --increasing--> x … x+a … x+k start resume_at end This record needs to be included for both boot and event replay. However, the bounds of that range were previously stored in two tables (`channel` and `channel_deleted`, or `message` and `message_deleted`, respectively), which sqlite (indeed most SQL implementations) cannot index. This forced a table scan, leading to the program considering every possible (channel, message) during event replay. This commit adds a `last_sequence` field to channels and messages, which is set to the above values as channels and messages are operated on. This field is indexed, and queries can use it to rapidly identify relevant rows for event replay, cutting down the amount of reading needed to generate events on resume. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 -------- ...32389259b91bbffa182e32b224635031eead2fa82d.json | 50 -------- ...ae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json | 20 +++ ...9075e71bd7e30dc93d32e1f273c878f18f2984860f.json | 62 +++++++++ ...3a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json | 20 +++ ...41e4322f8026f9e2515b6bacaed81f6248c52a198a.json | 50 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 --------- ...e4a2679aaa4c3f28aa5436a9af067a754e46af5589.json | 56 ++++++++ ...ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json | 12 ++ ...ae5ca13b72436960622863420d3f1d73a422fe5b42.json | 12 -- ...50121451dbcf01e8bec8a987b58c699b27b5d737af.json | 12 -- ...0241030152013_channel_message_last_event_id.sql | 141 +++++++++++++++++++++ src/channel/repo.rs | 21 ++- src/message/repo.rs | 16 ++- 14 files changed, 387 insertions(+), 203 deletions(-) delete mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json create mode 100644 .sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json create mode 100644 .sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json create mode 100644 .sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json create mode 100644 .sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json delete mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json create mode 100644 .sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json create mode 100644 .sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json delete mode 100644 .sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json delete mode 100644 .sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json create mode 100644 migrations/20241030152013_channel_message_last_event_id.sql (limited to 'src/channel') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json deleted file mode 100644 index 1d8a2e1..0000000 --- a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name?: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name?: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" -} diff --git a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json b/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json deleted file mode 100644 index fd5a165..0000000 --- a/.sqlx/query-0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body)\n values ($1, $2, $3, $4, $5, $6)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 6 - }, - "nullable": [ - false, - false, - false, - false, - false, - true - ] - }, - "hash": "0f0e4a6ac32b39f3bd7f4832389259b91bbffa182e32b224635031eead2fa82d" -} diff --git a/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json new file mode 100644 index 0000000..902b216 --- /dev/null +++ b/.sqlx/query-4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update channel\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "4e39f27605dec811824fddae5559dda60c4b2a9c6746376a3552ce73b7d8ea38" +} diff --git a/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json new file mode 100644 index 0000000..7ec6aac --- /dev/null +++ b/.sqlx/query-53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + false + ] + }, + "hash": "53b1f14d450a99f486bfd79075e71bd7e30dc93d32e1f273c878f18f2984860f" +} diff --git a/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json new file mode 100644 index 0000000..5179e74 --- /dev/null +++ b/.sqlx/query-64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update message\n set body = '', last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "64fb9bad4505c144578e393a7c0c7e8cf92e5ee6e3900fe9f94c75b5f8c9bfc4" +} diff --git a/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json new file mode 100644 index 0000000..eb30352 --- /dev/null +++ b/.sqlx/query-72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, channel, sender, sent_at, sent_sequence, body, last_sequence)\n values ($1, $2, $3, $4, $5, $6, $7)\n returning\n id as \"id: Id\",\n channel as \"channel: channel::Id\",\n sender as \"sender: login::Id\",\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\",\n body as \"body: Body\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 7 + }, + "nullable": [ + false, + false, + false, + false, + false, + true + ] + }, + "hash": "72441293731853e9f0cc1141e4322f8026f9e2515b6bacaed81f6248c52a198a" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json deleted file mode 100644 index 5423bfd..0000000 --- a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" -} diff --git a/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json new file mode 100644 index 0000000..37d685a --- /dev/null +++ b/.sqlx/query-c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.last_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "c44dbcc7f4c0257a991e1ae4a2679aaa4c3f28aa5436a9af067a754e46af5589" +} diff --git a/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json new file mode 100644 index 0000000..0118249 --- /dev/null +++ b/.sqlx/query-ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "ca9146e92c3b3e724f4b58ad72529de7030a4863d3bf479bb19a6a2a76d1590b" +} diff --git a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json b/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json deleted file mode 100644 index 658728c..0000000 --- a/.sqlx/query-d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, created_at, created_sequence)\n values ($1, $2, $3)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "d1c869c323d1ab45216279ae5ca13b72436960622863420d3f1d73a422fe5b42" -} diff --git a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json b/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json deleted file mode 100644 index 0c21ec1..0000000 --- a/.sqlx/query-e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n update message\n set body = ''\n where id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 1 - }, - "nullable": [] - }, - "hash": "e718f4064cbb3d1b27049450121451dbcf01e8bec8a987b58c699b27b5d737af" -} diff --git a/migrations/20241030152013_channel_message_last_event_id.sql b/migrations/20241030152013_channel_message_last_event_id.sql new file mode 100644 index 0000000..dd6e66b --- /dev/null +++ b/migrations/20241030152013_channel_message_last_event_id.sql @@ -0,0 +1,141 @@ +alter table channel +rename to old_channel; +alter table channel_name +rename to old_channel_name; +alter table channel_deleted +rename to old_channel_deleted; +alter table message +rename to old_message; +alter table message_deleted +rename to old_message_deleted; + +create table channel ( + id text + not null + primary key, + created_sequence bigint + unique + not null, + created_at text + not null, + last_sequence bigint + not null +); + +insert into channel (id, created_sequence, created_at, last_sequence) +select + ch.id, + ch.created_sequence, + ch.created_at, + max(ch.created_sequence, coalesce(del.deleted_sequence, ch.created_sequence)) as last_seqeuence +from old_channel as ch +left join old_channel_deleted as del + using (id); + +create table channel_name ( + id text + not null + primary key + references channel (id), + display_name + not null, + canonical_name + not null + unique +); + +insert into channel_name (id, display_name, canonical_name) +select id, display_name, canonical_name +from old_channel_name; + +create table channel_deleted ( + id text + not null + primary key + references channel (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into channel_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_channel_deleted; + +create table message ( + id text + not null + primary key, + channel text + not null + references channel (id), + sender text + not null + references login (id), + sent_sequence bigint + unique + not null, + sent_at text + not null, + body text + null, + last_sequence bigint + not null +); + +insert into message (id, channel, sender, sent_sequence, sent_at, body, last_sequence) +select + msg.id, + msg.channel, + msg.sender, + msg.sent_sequence, + msg.sent_at, + msg.body, + max(msg.sent_sequence, coalesce(del.deleted_sequence, msg.sent_sequence)) as last_sequence +from + old_message as msg + left join old_message_deleted as del + using (id); + +create table message_deleted ( + id text + not null + primary key + references message (id), + deleted_sequence bigint + unique + not null, + deleted_at text + not null +); + +insert into message_deleted (id, deleted_sequence, deleted_at) +select id, deleted_sequence, deleted_at +from old_message_deleted; + +drop table old_message_deleted; +drop table old_message; +drop table old_channel_deleted; +drop table old_channel_name; +drop table old_channel; + +-- recreate existing indices +create index message_sent_at +on message (sent_at); +create index message_deleted_deleted_at +on message_deleted (deleted_at); +create index message_channel +on message(channel); +create index channel_created_sequence +on channel (created_sequence); +create index channel_created_at +on channel (created_at); + +-- new indices +create index channel_last_sequence +on channel (last_sequence); + +create index message_last_sequence +on message (last_sequence); diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 7206c21..6612151 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -32,12 +32,13 @@ impl<'c> Channels<'c> { sqlx::query!( r#" insert - into channel (id, created_at, created_sequence) - values ($1, $2, $3) + into channel (id, created_at, created_sequence, last_sequence) + values ($1, $2, $3, $4) "#, id, created.at, created.sequence, + created.sequence, ) .execute(&mut *self.0) .await?; @@ -160,8 +161,7 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where channel.created_sequence > $1 - or deleted.deleted_sequence > $1 + where channel.last_sequence > $1 "#, resume_at, ) @@ -190,6 +190,19 @@ impl<'c> Channels<'c> { deleted: &Instant, ) -> Result { let id = channel.id(); + sqlx::query!( + r#" + update channel + set last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" + "#, + deleted.sequence, + id, + ) + .fetch_one(&mut *self.0) + .await?; + sqlx::query!( r#" insert into channel_deleted (id, deleted_at, deleted_sequence) diff --git a/src/message/repo.rs b/src/message/repo.rs index 913135c..14f8eaf 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -34,8 +34,8 @@ impl<'c> Messages<'c> { let message = sqlx::query!( r#" insert into message - (id, channel, sender, sent_at, sent_sequence, body) - values ($1, $2, $3, $4, $5, $6) + (id, channel, sender, sent_at, sent_sequence, body, last_sequence) + values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", channel as "channel: channel::Id", @@ -50,6 +50,7 @@ impl<'c> Messages<'c> { sent.at, sent.sequence, body, + sent.sequence, ) .map(|row| History { message: Message { @@ -205,12 +206,14 @@ impl<'c> Messages<'c> { sqlx::query!( r#" update message - set body = '' - where id = $1 + set body = '', last_sequence = max(last_sequence, $1) + where id = $2 + returning id as "id: Id" "#, + deleted.sequence, id, ) - .execute(&mut *self.0) + .fetch_one(&mut *self.0) .await?; let message = self.by_id(id).await?; @@ -297,8 +300,7 @@ impl<'c> Messages<'c> { from message left join message_deleted as deleted using (id) - where message.sent_sequence > $1 - or deleted.deleted_sequence > $1 + where message.last_sequence > $1 "#, resume_at, ) -- cgit v1.2.3