From da485e523913df28def6335be0836b1fc437617f Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 29 Oct 2024 19:32:30 -0400 Subject: Restrict login names. There's no good reason to use an empty string as your login name, or to use one so long as to annoy others. Names beginning or ending with whitespace, or containing runs of whitespace, are also a technical problem, so they're also prohibited. This change does not implement [UTS #39], as I haven't yet fully understood how to do so. [UTS #39]: https://www.unicode.org/reports/tr39/ --- src/test/fixtures/login.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/test/fixtures') diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs index e308289..86e3e39 100644 --- a/src/test/fixtures/login.rs +++ b/src/test/fixtures/login.rs @@ -1,4 +1,4 @@ -use faker_rand::en_us::internet; +use faker_rand::{en_us::internet, lorem::Paragraphs}; use uuid::Uuid; use crate::{ @@ -38,6 +38,10 @@ pub fn propose() -> (Name, Password) { (propose_name(), propose_password()) } +pub fn propose_invalid_name() -> Name { + rand::random::().to_string().into() +} + fn propose_name() -> Name { rand::random::().to_string().into() } -- cgit v1.2.3 From 9010c7feeca8f4e7e501ad474911deaaf7a1a367 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 29 Oct 2024 20:44:03 -0400 Subject: Restrict channel names, too. Thankfully, channel creation only happens in one place, so we don't need a state machine for this. --- docs/api/channels-messages.md | 14 +++++++++++++- src/channel/app.rs | 8 +++++++- src/channel/mod.rs | 1 + src/channel/routes/post.rs | 3 +++ src/channel/routes/test.rs | 24 ++++++++++++++++++++++++ src/channel/validate.rs | 23 +++++++++++++++++++++++ src/test/fixtures/channel.rs | 5 +++++ 7 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 src/channel/validate.rs (limited to 'src/test/fixtures') diff --git a/docs/api/channels-messages.md b/docs/api/channels-messages.md index 9854d22..6391b5a 100644 --- a/docs/api/channels-messages.md +++ b/docs/api/channels-messages.md @@ -64,6 +64,14 @@ The request must have the following fields: |:-------|:-------|:--| | `name` | string | The channel's name. | +The proposed `name` must be valid. The precise definition of valid is still up in the air, but, at minimum: + +* It must be non-empty. +* It must not be "too long." (Currently, 64 characters is too long.) +* It must begin with a printing character. +* It must end with a printing character. +* It must not contain runs of multiple whitespace characters. + ### Success This endpoint will respond with a status of `202 Accepted` when successful. The body of the response will be a JSON object describing the new channel: @@ -86,7 +94,11 @@ The returned name may not be identical to the name requested, as the name will b When completed, the service will emit a [channel created](events.md#channel-created) event with the channel's ID. -### Duplicate channel name +### Name not valid + +This endpoint will respond with a status of `400 Bad Request` if the proposed `name` is not valid. + +### Channel name in use This endpoint will respond with a status of `409 Conflict` if a channel with the requested name already exists. diff --git a/src/channel/app.rs b/src/channel/app.rs index 8359277..9a19b16 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -4,7 +4,7 @@ use sqlx::sqlite::SqlitePool; use super::{ repo::{LoadError, Provider as _}, - Channel, Id, + validate, Channel, Id, }; use crate::{ clock::DateTime, @@ -25,6 +25,10 @@ impl<'a> Channels<'a> { } pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result { + if !validate::name(name) { + return Err(CreateError::InvalidName(name.clone())); + } + let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; let channel = tx @@ -149,6 +153,8 @@ impl<'a> Channels<'a> { pub enum CreateError { #[error("channel named {0} already exists")] DuplicateName(Name), + #[error("invalid channel name: {0}")] + InvalidName(Name), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] diff --git a/src/channel/mod.rs b/src/channel/mod.rs index eb8200b..d5ba828 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -5,5 +5,6 @@ mod id; pub mod repo; mod routes; mod snapshot; +mod validate; pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs index 810445c..2cf1cc0 100644 --- a/src/channel/routes/post.rs +++ b/src/channel/routes/post.rs @@ -54,6 +54,9 @@ impl IntoResponse for Error { app::CreateError::DuplicateName(_) => { (StatusCode::CONFLICT, error.to_string()).into_response() } + app::CreateError::InvalidName(_) => { + (StatusCode::BAD_REQUEST, error.to_string()).into_response() + } other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 10b1e8d..f5369fb 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -115,6 +115,30 @@ async fn conflicting_canonical_name() { )); } +#[tokio::test] +async fn invalid_name() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::identity::create(&app, &fixtures::now()).await; + + // Call the endpoint + + let name = fixtures::channel::propose_invalid_name(); + let request = post::Request { name: name.clone() }; + let post::Error(error) = + post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect_err("invalid channel name should fail the request"); + + // Verify the structure of the response + + assert!(matches!( + error, + app::CreateError::InvalidName(error_name) if name == error_name + )); +} + #[tokio::test] async fn name_reusable_after_delete() { // Set up the environment diff --git a/src/channel/validate.rs b/src/channel/validate.rs new file mode 100644 index 0000000..0c97293 --- /dev/null +++ b/src/channel/validate.rs @@ -0,0 +1,23 @@ +use unicode_segmentation::UnicodeSegmentation as _; + +use crate::name::Name; + +// Picked out of a hat. The power of two is not meaningful. +const NAME_TOO_LONG: usize = 64; + +pub fn name(name: &Name) -> bool { + let display = name.display(); + + [ + display.graphemes(true).count() < NAME_TOO_LONG, + display.chars().all(|ch| !ch.is_control()), + display.chars().next().is_some_and(|c| !c.is_whitespace()), + display.chars().last().is_some_and(|c| !c.is_whitespace()), + display + .chars() + .zip(display.chars().skip(1)) + .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())), + ] + .into_iter() + .all(|value| value) +} diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs index 0c6480b..98048f2 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/channel.rs @@ -1,6 +1,7 @@ use faker_rand::{ en_us::{addresses::CityName, names::FullName}, faker_impl_from_templates, + lorem::Paragraphs, }; use rand; @@ -23,6 +24,10 @@ pub fn propose() -> Name { rand::random::().to_string().into() } +pub fn propose_invalid_name() -> Name { + rand::random::().to_string().into() +} + struct NameTemplate(String); faker_impl_from_templates! { NameTemplate; "{} {}", CityName, FullName; -- cgit v1.2.3 From 66d3fcf2e22f057bacce8d97d43a13c1c5a9ad09 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 29 Oct 2024 23:29:22 -0400 Subject: Add `change password` UI + API. The protocol here re-checks the caller's password, as a "I left myself logged in" anti-pranking check. --- ...fd391b5812dc2d1d4a52ea3072b5dd52d71637b33d.json | 20 +++++++ ...32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1.json | 50 ++++++++++++++++ ...c233ed32638e853e3b8b8f8de26b53f90c98b6ce11.json | 20 +++++++ docs/api/authentication.md | 47 +++++++++++++++ src/event/routes/test/token.rs | 49 ++++++++++++++++ src/login/repo.rs | 23 ++++++++ src/login/routes/mod.rs | 2 + src/login/routes/password/mod.rs | 4 ++ src/login/routes/password/post.rs | 54 +++++++++++++++++ src/login/routes/password/test.rs | 68 ++++++++++++++++++++++ src/test/fixtures/identity.rs | 10 +++- src/token/app.rs | 43 +++++++++++++- src/token/repo/auth.rs | 29 +++++++++ src/token/repo/token.rs | 18 ++++++ src/token/secret.rs | 2 +- src/ui/routes/me.rs | 32 ++++++++++ src/ui/routes/mod.rs | 2 + ui/lib/apiServer.js | 4 ++ ui/lib/components/CurrentUser.svelte | 26 +++++++++ ui/lib/components/LogOut.svelte | 26 --------- ui/routes/(app)/+layout.svelte | 4 -- ui/routes/(app)/me/+page.svelte | 41 +++++++++++++ ui/routes/+layout.svelte | 8 +-- 23 files changed, 542 insertions(+), 40 deletions(-) create mode 100644 .sqlx/query-0b1543ec93e02c48c5cbaafd391b5812dc2d1d4a52ea3072b5dd52d71637b33d.json create mode 100644 .sqlx/query-0f3bfb1ad8fad5213f733b32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1.json create mode 100644 .sqlx/query-c2b0ff7e2f27b6970a16fbc233ed32638e853e3b8b8f8de26b53f90c98b6ce11.json create mode 100644 src/login/routes/password/mod.rs create mode 100644 src/login/routes/password/post.rs create mode 100644 src/login/routes/password/test.rs create mode 100644 src/ui/routes/me.rs create mode 100644 ui/lib/components/CurrentUser.svelte delete mode 100644 ui/lib/components/LogOut.svelte create mode 100644 ui/routes/(app)/me/+page.svelte (limited to 'src/test/fixtures') diff --git a/.sqlx/query-0b1543ec93e02c48c5cbaafd391b5812dc2d1d4a52ea3072b5dd52d71637b33d.json b/.sqlx/query-0b1543ec93e02c48c5cbaafd391b5812dc2d1d4a52ea3072b5dd52d71637b33d.json new file mode 100644 index 0000000..937b07e --- /dev/null +++ b/.sqlx/query-0b1543ec93e02c48c5cbaafd391b5812dc2d1d4a52ea3072b5dd52d71637b33d.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n delete\n from token\n where login = $1\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "0b1543ec93e02c48c5cbaafd391b5812dc2d1d4a52ea3072b5dd52d71637b33d" +} diff --git a/.sqlx/query-0f3bfb1ad8fad5213f733b32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1.json b/.sqlx/query-0f3bfb1ad8fad5213f733b32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1.json new file mode 100644 index 0000000..ffd81dc --- /dev/null +++ b/.sqlx/query-0f3bfb1ad8fad5213f733b32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: login::Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\",\n password_hash as \"password_hash: StoredHash\"\n from login\n where id = $1\n ", + "describe": { + "columns": [ + { + "name": "id: login::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "password_hash: StoredHash", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "0f3bfb1ad8fad5213f733b32d8eb2d9c2bb4de2fbbf0b2280973966ef02f72b1" +} diff --git a/.sqlx/query-c2b0ff7e2f27b6970a16fbc233ed32638e853e3b8b8f8de26b53f90c98b6ce11.json b/.sqlx/query-c2b0ff7e2f27b6970a16fbc233ed32638e853e3b8b8f8de26b53f90c98b6ce11.json new file mode 100644 index 0000000..4c99c42 --- /dev/null +++ b/.sqlx/query-c2b0ff7e2f27b6970a16fbc233ed32638e853e3b8b8f8de26b53f90c98b6ce11.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n update login\n set password_hash = $1\n where id = $2\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "c2b0ff7e2f27b6970a16fbc233ed32638e853e3b8b8f8de26b53f90c98b6ce11" +} diff --git a/docs/api/authentication.md b/docs/api/authentication.md index 135e91b..93a8e52 100644 --- a/docs/api/authentication.md +++ b/docs/api/authentication.md @@ -113,3 +113,50 @@ The request must be an empty JSON object. This endpoint will respond with a status of `204 No Content` when successful. The response will include a `Set-Cookie` header that clears the `identity` cookie. Regardless of whether the client clears the cookie, the service also invalidates the token. + + +## `POST /api/password` + +Changes the current login's password, and invalidate all outstanding identity tokens. + +### Request + +```json +{ + "password": "my-old-password", + "to": "my-new-password" +} +``` + +The request must have the following fields: + +| Field | Type | Description | +|:-----------|:-------|:--| +| `password` | string | The login's _current_ password, in plain text. | +| `to` | string | The login's _new_ password, in plain text. | + +### Success + +This endpoint will respond with a status of `200 Okay` when successful. The body of the response will be a JSON object describing the authenticated login: + +```json +{ + "id": "Labcd1234", + "name": "Andrea" +} +``` + +The response will include the following fields: + +| Field | Type | Description | +|:------------|:-------|:--| +| `id` | string | The authenticated login's ID. | +| `name` | string | The authenticated login's name. | + +The response will include a `Set-Cookie` header for the `identity` cookie, providing the client with a newly-minted identity token associated with the login identified in the request. This token's value must be kept confidential. All previously-created identity cookies will cease to be valid. + +The cookie will expire if it is not used regularly. + +### Authentication failure + +This endpoint will respond with a status of `400 Bad Request` if the `password` does not match the login's current password. diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 2039d9b..16ac7c3 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -93,3 +93,52 @@ async fn terminates_on_logout() { .expect_none("end of stream") .await; } + +#[tokio::test] +async fn terminates_on_password_change() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let sender = fixtures::login::create(&app, &fixtures::now()).await; + + // Subscribe via the endpoint + + let creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; + let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await; + let subscriber = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await; + + let get::Response(events) = get::handler( + State(app.clone()), + subscriber.clone(), + None, + Query::default(), + ) + .await + .expect("subscribe never fails"); + + // Verify the resulting stream's behaviour + + let (_, password) = creds; + let to = fixtures::login::propose_password(); + app.tokens() + .change_password(&subscriber.login, &password, &to, &fixtures::now()) + .await + .expect("expiring tokens succeeds"); + + // These should not be delivered. + + let messages = [ + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + ]; + + events + .filter_map(fixtures::event::message) + .filter_map(fixtures::event::message::sent) + .filter(|event| future::ready(messages.iter().any(|message| &event.message == message))) + .next() + .expect_none("end of stream") + .await; +} diff --git a/src/login/repo.rs b/src/login/repo.rs index 611edd6..a972304 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -58,6 +58,29 @@ impl<'c> Logins<'c> { Ok(login) } + pub async fn set_password( + &mut self, + login: &History, + to: &StoredHash, + ) -> Result<(), sqlx::Error> { + let login = login.id(); + + sqlx::query_scalar!( + r#" + update login + set password_hash = $1 + where id = $2 + returning id as "id: Id" + "#, + to, + login, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { let logins = sqlx::query!( r#" diff --git a/src/login/routes/mod.rs b/src/login/routes/mod.rs index 8cb8852..bbd0c3f 100644 --- a/src/login/routes/mod.rs +++ b/src/login/routes/mod.rs @@ -4,9 +4,11 @@ use crate::app::App; mod login; mod logout; +mod password; pub fn router() -> Router { Router::new() + .route("/api/password", post(password::post::handler)) .route("/api/auth/login", post(login::post::handler)) .route("/api/auth/logout", post(logout::post::handler)) } diff --git a/src/login/routes/password/mod.rs b/src/login/routes/password/mod.rs new file mode 100644 index 0000000..36b384e --- /dev/null +++ b/src/login/routes/password/mod.rs @@ -0,0 +1,4 @@ +pub mod post; + +#[cfg(test)] +mod test; diff --git a/src/login/routes/password/post.rs b/src/login/routes/password/post.rs new file mode 100644 index 0000000..4723754 --- /dev/null +++ b/src/login/routes/password/post.rs @@ -0,0 +1,54 @@ +use axum::{ + extract::{Json, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::{Login, Password}, + token::{ + app, + extract::{Identity, IdentityCookie}, + }, +}; + +pub async fn handler( + State(app): State, + RequestedAt(now): RequestedAt, + identity: Identity, + cookie: IdentityCookie, + Json(request): Json, +) -> Result<(IdentityCookie, Json), Error> { + let (login, secret) = app + .tokens() + .change_password(&identity.login, &request.password, &request.to, &now) + .await + .map_err(Error)?; + let cookie = cookie.set(secret); + Ok((cookie, Json(login))) +} + +#[derive(serde::Deserialize)] +pub struct Request { + pub password: Password, + pub to: Password, +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct Error(#[from] pub app::LoginError); + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + app::LoginError::Rejected => { + (StatusCode::BAD_REQUEST, "invalid name or password").into_response() + } + other => Internal::from(other).into_response(), + } + } +} diff --git a/src/login/routes/password/test.rs b/src/login/routes/password/test.rs new file mode 100644 index 0000000..c1974bf --- /dev/null +++ b/src/login/routes/password/test.rs @@ -0,0 +1,68 @@ +use axum::extract::{Json, State}; + +use super::post; +use crate::{ + test::fixtures, + token::app::{LoginError, ValidateError}, +}; + +#[tokio::test] +async fn password_change() { + // Set up the environment + let app = fixtures::scratch_app().await; + let creds = fixtures::login::create_with_password(&app, &fixtures::now()).await; + let cookie = fixtures::cookie::logged_in(&app, &creds, &fixtures::now()).await; + let identity = fixtures::identity::from_cookie(&app, &cookie, &fixtures::now()).await; + + // Call the endpoint + let (name, password) = creds; + let to = fixtures::login::propose_password(); + let request = post::Request { + password: password.clone(), + to: to.clone(), + }; + let (new_cookie, Json(response)) = post::handler( + State(app.clone()), + fixtures::now(), + identity.clone(), + cookie.clone(), + Json(request), + ) + .await + .expect("changing passwords succeeds"); + + // Verify that we have a new session + assert_ne!(cookie.secret(), new_cookie.secret()); + + // Verify that we're still ourselves + assert_eq!(identity.login, response); + + // Verify that our original token is no longer valid + let validate_err = app + .tokens() + .validate( + &cookie + .secret() + .expect("original identity cookie has a secret"), + &fixtures::now(), + ) + .await + .expect_err("validating the original identity secret should fail"); + assert!(matches!(validate_err, ValidateError::InvalidToken)); + + // Verify that our original password is no longer valid + let login_err = app + .tokens() + .login(&name, &password, &fixtures::now()) + .await + .expect_err("logging in with the original password should fail"); + assert!(matches!(login_err, LoginError::Rejected)); + + // Verify that our new password is valid + let (login, _) = app + .tokens() + .login(&name, &to, &fixtures::now()) + .await + .expect("logging in with the new password should succeed"); + assert_eq!(identity.login, login); +} diff --git a/src/test/fixtures/identity.rs b/src/test/fixtures/identity.rs index e438f2b..ffc44c6 100644 --- a/src/test/fixtures/identity.rs +++ b/src/test/fixtures/identity.rs @@ -15,11 +15,15 @@ pub async fn create(app: &App, created_at: &RequestedAt) -> Identity { logged_in(app, &credentials, created_at).await } -pub async fn from_cookie(app: &App, token: &IdentityCookie, issued_at: &RequestedAt) -> Identity { - let secret = token.secret().expect("identity token has a secret"); +pub async fn from_cookie( + app: &App, + cookie: &IdentityCookie, + validated_at: &RequestedAt, +) -> Identity { + let secret = cookie.secret().expect("identity token has a secret"); let (token, login) = app .tokens() - .validate(&secret, issued_at) + .validate(&secret, validated_at) .await .expect("always validates newly-issued secret"); diff --git a/src/token/app.rs b/src/token/app.rs index c19d6a0..5c0aeb0 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -13,7 +13,7 @@ use super::{ use crate::{ clock::DateTime, db::NotFound as _, - login::{Login, Password}, + login::{repo::Provider as _, Login, Password}, name::{self, Name}, }; @@ -61,6 +61,47 @@ impl<'a> Tokens<'a> { Ok((snapshot, token)) } + pub async fn change_password( + &self, + login: &Login, + password: &Password, + to: &Password, + changed_at: &DateTime, + ) -> Result<(Login, Secret), LoginError> { + let mut tx = self.db.begin().await?; + let (login, stored_hash) = tx + .auth() + .for_login(login) + .await + .optional()? + .ok_or(LoginError::Rejected)?; + // Split the transaction here to avoid holding the tx open (potentially blocking + // other writes) while we do the fairly expensive task of verifying the + // password. It's okay if the token issuance transaction happens some notional + // amount of time after retrieving the login, as inserting the token will fail + // if the account is deleted during that time. + tx.commit().await?; + + if !stored_hash.verify(password)? { + return Err(LoginError::Rejected); + } + + let snapshot = login.as_snapshot().ok_or(LoginError::Rejected)?; + let to_hash = to.hash()?; + + let mut tx = self.db.begin().await?; + let tokens = tx.tokens().revoke_all(&login).await?; + tx.logins().set_password(&login, &to_hash).await?; + let secret = tx.tokens().issue(&login, changed_at).await?; + tx.commit().await?; + + for event in tokens.into_iter().map(TokenEvent::Revoked) { + self.token_events.broadcast(event); + } + + Ok((snapshot, secret)) + } + pub async fn validate( &self, secret: &Secret, diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs index bdc4c33..b51db8c 100644 --- a/src/token/repo/auth.rs +++ b/src/token/repo/auth.rs @@ -50,6 +50,35 @@ impl<'t> Auth<'t> { Ok((login, row.password_hash)) } + + pub async fn for_login(&mut self, login: &Login) -> Result<(History, StoredHash), LoadError> { + let row = sqlx::query!( + r#" + select + id as "id: login::Id", + display_name as "display_name: String", + canonical_name as "canonical_name: String", + created_sequence as "created_sequence: Sequence", + created_at as "created_at: DateTime", + password_hash as "password_hash: StoredHash" + from login + where id = $1 + "#, + login.id, + ) + .fetch_one(&mut *self.0) + .await?; + + let login = History { + login: Login { + id: row.id, + name: Name::new(row.display_name, row.canonical_name)?, + }, + created: Instant::new(row.created_at, row.created_sequence), + }; + + Ok((login, row.password_hash)) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/token/repo/token.rs b/src/token/repo/token.rs index 35ea385..33b89d5 100644 --- a/src/token/repo/token.rs +++ b/src/token/repo/token.rs @@ -84,6 +84,24 @@ impl<'c> Tokens<'c> { Ok(()) } + // Revoke tokens for a login + pub async fn revoke_all(&mut self, login: &login::History) -> Result, sqlx::Error> { + let login = login.id(); + let tokens = sqlx::query_scalar!( + r#" + delete + from token + where login = $1 + returning id as "id: Id" + "#, + login, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(tokens) + } + // Expire and delete all tokens that haven't been used more recently than // `expire_at`. pub async fn expire(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { diff --git a/src/token/secret.rs b/src/token/secret.rs index 28c93bb..8f70646 100644 --- a/src/token/secret.rs +++ b/src/token/secret.rs @@ -1,6 +1,6 @@ use std::fmt; -#[derive(sqlx::Type)] +#[derive(PartialEq, Eq, sqlx::Type)] #[sqlx(transparent)] pub struct Secret(String); diff --git a/src/ui/routes/me.rs b/src/ui/routes/me.rs new file mode 100644 index 0000000..f1f118f --- /dev/null +++ b/src/ui/routes/me.rs @@ -0,0 +1,32 @@ +pub mod get { + use axum::response::{self, IntoResponse, Redirect}; + + use crate::{ + error::Internal, + token::extract::Identity, + ui::assets::{Asset, Assets}, + }; + + pub async fn handler(identity: Option) -> Result { + let _ = identity.ok_or(Error::NotLoggedIn)?; + + Assets::index().map_err(Error::Internal) + } + + #[derive(Debug, thiserror::Error)] + pub enum Error { + #[error("not logged in")] + NotLoggedIn, + #[error("{0}")] + Internal(Internal), + } + + impl IntoResponse for Error { + fn into_response(self) -> response::Response { + match self { + Self::NotLoggedIn => Redirect::temporary("/login").into_response(), + Self::Internal(error) => error.into_response(), + } + } + } +} diff --git a/src/ui/routes/mod.rs b/src/ui/routes/mod.rs index 72d9a4a..48b3f90 100644 --- a/src/ui/routes/mod.rs +++ b/src/ui/routes/mod.rs @@ -6,6 +6,7 @@ mod ch; mod get; mod invite; mod login; +mod me; mod path; mod setup; @@ -16,6 +17,7 @@ pub fn router(app: &App) -> Router { .route("/setup", get(setup::get::handler)), Router::new() .route("/", get(get::handler)) + .route("/me", get(me::get::handler)) .route("/login", get(login::get::handler)) .route("/ch/:channel", get(ch::channel::get::handler)) .route("/invite/:invite", get(invite::invite::get::handler)) diff --git a/ui/lib/apiServer.js b/ui/lib/apiServer.js index db554e2..19dcf60 100644 --- a/ui/lib/apiServer.js +++ b/ui/lib/apiServer.js @@ -30,6 +30,10 @@ export async function logOut() { return apiServer.post('/auth/logout', {}); } +export async function changePassword(password, to) { + return apiServer.post('/password', { password, to }); +} + export async function createChannel(name) { return apiServer.post('/channels', { name }); } diff --git a/ui/lib/components/CurrentUser.svelte b/ui/lib/components/CurrentUser.svelte new file mode 100644 index 0000000..4b1b974 --- /dev/null +++ b/ui/lib/components/CurrentUser.svelte @@ -0,0 +1,26 @@ + + +
+ {#if $currentUser} + @{$currentUser.username} + {/if} + +
+ + diff --git a/ui/lib/components/LogOut.svelte b/ui/lib/components/LogOut.svelte deleted file mode 100644 index ba0861a..0000000 --- a/ui/lib/components/LogOut.svelte +++ /dev/null @@ -1,26 +0,0 @@ - - -
- {#if $currentUser} - @{$currentUser.username} - {/if} - -
- - diff --git a/ui/routes/(app)/+layout.svelte b/ui/routes/(app)/+layout.svelte index 9abaaf4..08c6694 100644 --- a/ui/routes/(app)/+layout.svelte +++ b/ui/routes/(app)/+layout.svelte @@ -86,8 +86,4 @@ max-height: 100%; overflow: scroll; } - #interface .active-channel { - border: 1px solid grey; - border-radius: 1.25rem; - } diff --git a/ui/routes/(app)/me/+page.svelte b/ui/routes/(app)/me/+page.svelte new file mode 100644 index 0000000..fb612b8 --- /dev/null +++ b/ui/routes/(app)/me/+page.svelte @@ -0,0 +1,41 @@ + + +
+ + + + + + + +
+ + diff --git a/ui/routes/+layout.svelte b/ui/routes/+layout.svelte index fdd3883..711b8bd 100644 --- a/ui/routes/+layout.svelte +++ b/ui/routes/+layout.svelte @@ -3,18 +3,16 @@ import "../app.css"; import { currentUser } from '$lib/store'; - import LogOut from '$lib/components/LogOut.svelte'; - import Invite from '$lib/components/Invite.svelte'; + import CurrentUser from '$lib/components/CurrentUser.svelte';
🌳 - understory + understory {#if $currentUser} - - + {/if} -- cgit v1.2.3 From 50a382528288248381b07c25719cbc9a519b4c81 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 30 Oct 2024 02:01:31 -0400 Subject: Resume points are no longer optional. This is an inconsequential change for actual clients, since "resume from the beginning" was never a preferred mode of operation, and it simplifies some internals. It should also mean we get better query plans where `coalesce(cond, true)` was previously being used. --- ...1fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json | 56 ++++++++++ ...1314f70d13e8b0ca22b7652f1063ec7796cf307269.json | 62 ----------- ...9da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json | 44 ++++++++ ...27d5ba226bf52349548261393e00e74081cdbe041b.json | 62 +++++++++++ ...1602b8befafe751e9caeb0d5f279731e04c6925f46.json | 44 -------- ...c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json | 56 ---------- ...4730699683e63ae1ea404418a17c6af60148f03fe8.json | 44 -------- ...932ace995f53efd6c7800a7a1b42daec41d081b3d2.json | 44 ++++++++ ...0599914331682d58ace57214bfa26ccaa089592a24.json | 62 ----------- ...84a27bdaefca99706a0c73c17f19cc537f3f669882.json | 62 +++++++++++ docs/api/events.md | 2 +- src/boot/app.rs | 4 +- src/channel/history.rs | 6 +- src/channel/repo.rs | 11 +- src/channel/routes/channel/test/post.rs | 3 +- src/channel/routes/test.rs | 3 +- src/event/app.rs | 9 +- src/event/mod.rs | 2 - src/event/routes/get.rs | 10 +- src/event/routes/test/channel.rs | 91 +++++++++++----- src/event/routes/test/invite.rs | 26 +++-- src/event/routes/test/message.rs | 115 +++++++++++++++------ src/event/routes/test/resume.rs | 12 ++- src/event/routes/test/setup.rs | 13 ++- src/event/routes/test/token.rs | 19 ++-- src/event/sequence.rs | 9 +- src/login/history.rs | 6 +- src/login/repo.rs | 10 +- src/message/history.rs | 6 +- src/message/repo.rs | 21 ++-- src/test/fixtures/boot.rs | 9 ++ src/test/fixtures/mod.rs | 1 + 32 files changed, 519 insertions(+), 405 deletions(-) create mode 100644 .sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json delete mode 100644 .sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json create mode 100644 .sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json create mode 100644 .sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json delete mode 100644 .sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json delete mode 100644 .sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json delete mode 100644 .sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json create mode 100644 .sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json delete mode 100644 .sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json create mode 100644 .sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json create mode 100644 src/test/fixtures/boot.rs (limited to 'src/test/fixtures') diff --git a/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json new file mode 100644 index 0000000..1d8a2e1 --- /dev/null +++ b/.sqlx/query-093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4.json @@ -0,0 +1,56 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name?: String\",\n name.canonical_name as \"canonical_name?: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where channel.created_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name?: String", + "ordinal": 1, + "type_info": "Null" + }, + { + "name": "canonical_name?: String", + "ordinal": 2, + "type_info": "Null" + }, + { + "name": "created_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 6, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + true, + false, + false, + true, + true + ] + }, + "hash": "093a57206253b4c05c2aa51fe70264af3f5b77c6dacd4a61be470f262d62b5d4" +} diff --git a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json b/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json deleted file mode 100644 index 9f09a28..0000000 --- a/.sqlx/query-24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel: channel::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "body: Body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false, - true, - true, - true - ] - }, - "hash": "24bc0257eff3357322481e1314f70d13e8b0ca22b7652f1063ec7796cf307269" -} diff --git a/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json new file mode 100644 index 0000000..ae546ad --- /dev/null +++ b/.sqlx/query-2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where login.created_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2c20c29d9adfed6201a6a69da95bc8271dfc8c6ec8ebf174ba8a57111b322291" +} diff --git a/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json new file mode 100644 index 0000000..5423bfd --- /dev/null +++ b/.sqlx/query-9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n message.body as \"body: Body\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence > $1\n or deleted.deleted_sequence > $1\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel: channel::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "body: Body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true, + true + ] + }, + "hash": "9a748f34805bb316452cb527d5ba226bf52349548261393e00e74081cdbe041b" +} diff --git a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json b/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json deleted file mode 100644 index 2d1f49e..0000000 --- a/.sqlx/query-9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(created_sequence <= $1, true)\n order by canonical_name\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "9f611a3351f22ed16d67d41602b8befafe751e9caeb0d5f279731e04c6925f46" -} diff --git a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json b/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json deleted file mode 100644 index 436e1dd..0000000 --- a/.sqlx/query-a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n name.display_name as \"display_name: String\",\n name.canonical_name as \"canonical_name: String\",\n channel.created_at as \"created_at: DateTime\",\n channel.created_sequence as \"created_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from channel\n left join channel_name as name\n using (id)\n left join channel_deleted as deleted\n using (id)\n where coalesce(channel.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Null" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Null" - }, - { - "name": "created_at: DateTime", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 4, - "type_info": "Integer" - }, - { - "name": "deleted_at?: DateTime", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "deleted_sequence?: Sequence", - "ordinal": 6, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - true, - true, - false, - false, - true, - true - ] - }, - "hash": "a40496319887752f5845c0c2e965e2294c1a6932d72a8b04d258d06d0f8938bd" -} diff --git a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json b/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json deleted file mode 100644 index 7d9bbbc..0000000 --- a/.sqlx/query-ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where coalesce(login.created_sequence > $1, true)\n ", - "describe": { - "columns": [ - { - "name": "id: Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "display_name: String", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "canonical_name: String", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "created_sequence: Sequence", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "created_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "ac7ab464e44e4412cd83744730699683e63ae1ea404418a17c6af60148f03fe8" -} diff --git a/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json new file mode 100644 index 0000000..9c3c10e --- /dev/null +++ b/.sqlx/query-cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n display_name as \"display_name: String\",\n canonical_name as \"canonical_name: String\",\n created_sequence as \"created_sequence: Sequence\",\n created_at as \"created_at: DateTime\"\n from login\n where created_sequence <= $1\n order by canonical_name\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "display_name: String", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "canonical_name: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "created_sequence: Sequence", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "created_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "cbf29fae3725bbb3d9e94d932ace995f53efd6c7800a7a1b42daec41d081b3d2" +} diff --git a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json b/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json deleted file mode 100644 index 7aab764..0000000 --- a/.sqlx/query-fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where coalesce(message.sent_sequence <= $2, true)\n order by message.sent_sequence\n ", - "describe": { - "columns": [ - { - "name": "channel: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body: Body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 5, - "type_info": "Integer" - }, - { - "name": "deleted_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "deleted_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false, - true, - false, - false, - true, - true - ] - }, - "hash": "fce8f4fbd59a8b3b8531e10599914331682d58ace57214bfa26ccaa089592a24" -} diff --git a/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json new file mode 100644 index 0000000..f38f49c --- /dev/null +++ b/.sqlx/query-ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.channel as \"channel: channel::Id\",\n message.sender as \"sender: login::Id\",\n message.id as \"id: Id\",\n message.body as \"body: Body\",\n message.sent_at as \"sent_at: DateTime\",\n message.sent_sequence as \"sent_sequence: Sequence\",\n deleted.deleted_at as \"deleted_at?: DateTime\",\n deleted.deleted_sequence as \"deleted_sequence?: Sequence\"\n from message\n left join message_deleted as deleted\n using (id)\n where message.sent_sequence <= $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "channel: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: login::Id", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body: Body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 5, + "type_info": "Integer" + }, + { + "name": "deleted_at?: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "deleted_sequence?: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + true, + false, + false, + false, + false + ] + }, + "hash": "ff61ff22108f1e98bbfc9a84a27bdaefca99706a0c73c17f19cc537f3f669882" +} diff --git a/docs/api/events.md b/docs/api/events.md index b08e971..b23469c 100644 --- a/docs/api/events.md +++ b/docs/api/events.md @@ -46,7 +46,7 @@ This endpoint is designed for use with the [EventSource] DOM API, and supports s ### Query parameters -This endpoint accepts an optional `resume_point` (integer) query parameter. When provided, the event stream will include events published after that point in time; the value must be the value obtained by calling the [`GET /api/boot`](./boot.md) method. If absent, the returned event stream includes all events. +This endpoint requires a `resume_point` (integer) query parameter. The event stream will collect events published after that point in time. The value must be obtained by calling the [`GET /api/boot`](./boot.md) method. ### Request headers diff --git a/src/boot/app.rs b/src/boot/app.rs index e716b58..909f7d8 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -22,9 +22,9 @@ impl<'a> Boot<'a> { let mut tx = self.db.begin().await?; let resume_point = tx.sequence().current().await?; - let logins = tx.logins().all(resume_point.into()).await?; + let logins = tx.logins().all(resume_point).await?; let channels = tx.channels().all(resume_point).await?; - let messages = tx.messages().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point).await?; tx.commit().await?; diff --git a/src/channel/history.rs b/src/channel/history.rs index dda7bb9..ef2120d 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Created, Deleted, Event}, Channel, Id, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -28,9 +28,9 @@ impl History { self.channel.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/channel/repo.rs b/src/channel/repo.rs index f47e564..7206c21 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -5,7 +5,7 @@ use crate::{ channel::{Channel, History, Id}, clock::DateTime, db::NotFound, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, name::{self, Name}, }; @@ -144,13 +144,13 @@ impl<'c> Channels<'c> { Ok(channels) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let channels = sqlx::query!( r#" select id as "id: Id", - name.display_name as "display_name: String", - name.canonical_name as "canonical_name: String", + name.display_name as "display_name?: String", + name.canonical_name as "canonical_name?: String", channel.created_at as "created_at: DateTime", channel.created_sequence as "created_sequence: Sequence", deleted.deleted_at as "deleted_at?: DateTime", @@ -160,7 +160,8 @@ impl<'c> Channels<'c> { using (id) left join channel_deleted as deleted using (id) - where coalesce(channel.created_sequence > $1, true) + where channel.created_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs index 111a703..bc0684b 100644 --- a/src/channel/routes/channel/test/post.rs +++ b/src/channel/routes/channel/test/post.rs @@ -15,6 +15,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -41,7 +42,7 @@ async fn messages_in_order() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing to a valid channel succeeds") .filter_map(fixtures::event::message) diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index f5369fb..cba8f2e 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -16,6 +16,7 @@ async fn new_channel() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -44,7 +45,7 @@ async fn new_channel() { let mut events = app .events() - .subscribe(None) + .subscribe(resume_point) .await .expect("subscribing never fails") .filter_map(fixtures::event::channel) diff --git a/src/event/app.rs b/src/event/app.rs index c754388..b309245 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -6,7 +6,7 @@ use futures::{ use itertools::Itertools as _; use sqlx::sqlite::SqlitePool; -use super::{broadcaster::Broadcaster, Event, ResumePoint, Sequence, Sequenced}; +use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced}; use crate::{ channel::{self, repo::Provider as _}, login::{self, repo::Provider as _}, @@ -26,9 +26,8 @@ impl<'a> Events<'a> { pub async fn subscribe( &self, - resume_at: impl Into, + resume_at: Sequence, ) -> Result + std::fmt::Debug, Error> { - let resume_at = resume_at.into(); // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); @@ -63,7 +62,7 @@ impl<'a> Events<'a> { .merge_by(channel_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::>(); - let resume_live_at = replay_events.last().map(Sequenced::sequence); + let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); let replay = stream::iter(replay_events); @@ -77,7 +76,7 @@ impl<'a> Events<'a> { Ok(replay.chain(live_messages)) } - fn resume(resume_at: ResumePoint) -> impl for<'m> FnMut(&'m Event) -> future::Ready { + fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready { let filter = Sequence::after(resume_at); move |event| future::ready(filter(event)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 69c7a10..9996916 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -13,8 +13,6 @@ pub use self::{ sequence::{Instant, Sequence, Sequenced}, }; -pub type ResumePoint = Option; - #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs index 22e8762..ceebcc9 100644 --- a/src/event/routes/get.rs +++ b/src/event/routes/get.rs @@ -12,7 +12,7 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, error::{Internal, Unauthorized}, - event::{app, extract::LastEventId, Event, ResumePoint, Sequence, Sequenced as _}, + event::{app, extract::LastEventId, Event, Sequence, Sequenced as _}, token::{app::ValidateError, extract::Identity}, }; @@ -22,9 +22,7 @@ pub async fn handler( last_event_id: Option>, Query(query): Query, ) -> Result + std::fmt::Debug>, Error> { - let resume_at = last_event_id - .map(LastEventId::into_inner) - .or(query.resume_point); + let resume_at = last_event_id.map_or(query.resume_point, LastEventId::into_inner); let stream = app.events().subscribe(resume_at).await?; let stream = app.tokens().limit_stream(identity.token, stream).await?; @@ -32,9 +30,9 @@ pub async fn handler( Ok(Response(stream)) } -#[derive(Default, serde::Deserialize)] +#[derive(serde::Deserialize)] pub struct QueryParams { - pub resume_point: ResumePoint, + pub resume_point: Sequence, } #[derive(Debug)] diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs index 6a0a803..0695ab1 100644 --- a/src/event/routes/test/channel.rs +++ b/src/event/routes/test/channel.rs @@ -12,14 +12,19 @@ async fn creating() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Create a channel @@ -46,6 +51,7 @@ async fn previously_created() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Create a channel @@ -59,10 +65,14 @@ async fn previously_created() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify channel created event @@ -81,14 +91,19 @@ async fn expiring() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire channels @@ -113,6 +128,7 @@ async fn previously_expired() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire channels @@ -124,10 +140,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -145,14 +165,19 @@ async fn deleting() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the channel @@ -177,6 +202,7 @@ async fn previously_deleted() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the channel @@ -188,10 +214,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -209,6 +239,7 @@ async fn previously_purged() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete and purge the channel @@ -225,10 +256,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event events diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs index d24f474..73af62d 100644 --- a/src/event/routes/test/invite.rs +++ b/src/event/routes/test/invite.rs @@ -14,14 +14,19 @@ async fn accepting_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Accept the invite @@ -50,6 +55,7 @@ async fn previously_accepted_invite() { let app = fixtures::scratch_app().await; let issuer = fixtures::login::create(&app, &fixtures::now()).await; let invite = fixtures::invite::issue(&app, &issuer, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Accept the invite @@ -63,10 +69,14 @@ async fn previously_accepted_invite() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs index a7b25fb..fafaeb3 100644 --- a/src/event/routes/test/message.rs +++ b/src/event/routes/test/message.rs @@ -16,14 +16,19 @@ async fn sending() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Send a message @@ -56,6 +61,7 @@ async fn previously_sent() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -74,10 +80,14 @@ async fn previously_sent() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify that an event is delivered @@ -96,6 +106,7 @@ async fn sent_in_multiple_channels() { let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let channels = [ fixtures::channel::create(&app, &fixtures::now()).await, @@ -115,9 +126,14 @@ async fn sent_in_multiple_channels() { // Call the endpoint let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the structure of the response. @@ -141,6 +157,7 @@ async fn sent_sequentially() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, @@ -151,9 +168,14 @@ async fn sent_sequentially() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = get::handler(State(app), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the expected events in the expected order @@ -180,14 +202,19 @@ async fn expiring() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expire messages @@ -214,6 +241,7 @@ async fn previously_expired() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -225,10 +253,14 @@ async fn previously_expired() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for expiry event let _ = events @@ -248,14 +280,19 @@ async fn deleting() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Delete the message @@ -282,6 +319,7 @@ async fn previously_deleted() { let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -293,10 +331,14 @@ async fn previously_deleted() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event let _ = events @@ -316,6 +358,7 @@ async fn previously_purged() { let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; let sender = fixtures::login::create(&app, &fixtures::ancient()).await; let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message @@ -332,10 +375,14 @@ async fn previously_purged() { // Subscribe let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Check for delete event diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs index 62b9bad..fabda0c 100644 --- a/src/event/routes/test/resume.rs +++ b/src/event/routes/test/resume.rs @@ -16,6 +16,7 @@ async fn resume() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; @@ -34,7 +35,7 @@ async fn resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -55,7 +56,7 @@ async fn resume() { State(app), subscriber, Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -98,6 +99,7 @@ async fn serial_resume() { let sender = fixtures::login::create(&app, &fixtures::now()).await; let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -115,7 +117,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -156,7 +158,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -197,7 +199,7 @@ async fn serial_resume() { State(app.clone()), subscriber.clone(), Some(resume_at.into()), - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs index 007b03d..26b7ea7 100644 --- a/src/event/routes/test/setup.rs +++ b/src/event/routes/test/setup.rs @@ -15,6 +15,7 @@ async fn previously_completed() { // Set up the environment let app = fixtures::scratch_app().await; + let resume_point = fixtures::boot::resume_point(&app).await; // Complete initial setup @@ -28,10 +29,14 @@ async fn previously_completed() { // Subscribe to events let subscriber = fixtures::identity::create(&app, &fixtures::now()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Expect a login created event diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs index 16ac7c3..fa76865 100644 --- a/src/event/routes/test/token.rs +++ b/src/event/routes/test/token.rs @@ -14,6 +14,7 @@ async fn terminates_on_token_expiry() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -21,10 +22,14 @@ async fn terminates_on_token_expiry() { let subscriber = fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await; - let get::Response(events) = - get::handler(State(app.clone()), subscriber, None, Query::default()) - .await - .expect("subscribe never fails"); + let get::Response(events) = get::handler( + State(app.clone()), + subscriber, + None, + Query(get::QueryParams { resume_point }), + ) + .await + .expect("subscribe never fails"); // Verify the resulting stream's behaviour @@ -56,6 +61,7 @@ async fn terminates_on_logout() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -65,7 +71,7 @@ async fn terminates_on_logout() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); @@ -101,6 +107,7 @@ async fn terminates_on_password_change() { let app = fixtures::scratch_app().await; let channel = fixtures::channel::create(&app, &fixtures::now()).await; let sender = fixtures::login::create(&app, &fixtures::now()).await; + let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe via the endpoint @@ -112,7 +119,7 @@ async fn terminates_on_password_change() { State(app.clone()), subscriber.clone(), None, - Query::default(), + Query(get::QueryParams { resume_point }), ) .await .expect("subscribe never fails"); diff --git a/src/event/sequence.rs b/src/event/sequence.rs index 9bc399b..77281c2 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -1,6 +1,5 @@ use std::fmt; -use super::ResumePoint; use crate::clock::DateTime; #[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize)] @@ -51,18 +50,18 @@ impl fmt::Display for Sequence { } impl Sequence { - pub fn up_to(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn up_to(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point.map_or(true, |resume_point| event.sequence() <= resume_point) + move |event| event.sequence() <= resume_point } - pub fn after(resume_point: ResumePoint) -> impl for<'e> Fn(&'e E) -> bool + pub fn after(resume_point: Sequence) -> impl for<'e> Fn(&'e E) -> bool where E: Sequenced, { - move |event| resume_point < Some(event.sequence()) + move |event| resume_point < event.sequence() } pub fn start_from(resume_point: Self) -> impl for<'e> Fn(&'e E) -> bool diff --git a/src/login/history.rs b/src/login/history.rs index daad579..8161b0b 100644 --- a/src/login/history.rs +++ b/src/login/history.rs @@ -2,7 +2,7 @@ use super::{ event::{Created, Event}, Id, Login, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -24,9 +24,9 @@ impl History { self.login.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/login/repo.rs b/src/login/repo.rs index 9439a25..1c63a4b 100644 --- a/src/login/repo.rs +++ b/src/login/repo.rs @@ -3,7 +3,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; @@ -81,7 +81,7 @@ impl<'c> Logins<'c> { Ok(()) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -91,7 +91,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(created_sequence <= $1, true) + where created_sequence <= $1 order by canonical_name "#, resume_at, @@ -113,7 +113,7 @@ impl<'c> Logins<'c> { Ok(logins) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select @@ -123,7 +123,7 @@ impl<'c> Logins<'c> { created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login - where coalesce(login.created_sequence > $1, true) + where login.created_sequence > $1 "#, resume_at, ) diff --git a/src/message/history.rs b/src/message/history.rs index 67e437a..ed8f5df 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -4,7 +4,7 @@ use super::{ event::{Deleted, Event, Sent}, Id, Message, }; -use crate::event::{Instant, ResumePoint, Sequence}; +use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { @@ -27,9 +27,9 @@ impl History { self.message.clone() } - pub fn as_of(&self, resume_point: impl Into) -> Option { + pub fn as_of(&self, resume_point: Sequence) -> Option { self.events() - .filter(Sequence::up_to(resume_point.into())) + .filter(Sequence::up_to(resume_point)) .collect() } diff --git a/src/message/repo.rs b/src/message/repo.rs index c8ceceb..913135c 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -4,7 +4,7 @@ use super::{snapshot::Message, Body, History, Id}; use crate::{ channel, clock::DateTime, - event::{Instant, ResumePoint, Sequence}, + event::{Instant, Sequence}, login::{self, Login}, }; @@ -106,22 +106,22 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn all(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn all(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.channel as "channel: channel::Id", message.sender as "sender: login::Id", - id as "id: Id", + message.id as "id: Id", message.body as "body: Body", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence <= $2, true) + where message.sent_sequence <= $1 order by message.sent_sequence "#, resume_at, @@ -282,7 +282,7 @@ impl<'c> Messages<'c> { Ok(messages) } - pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, sqlx::Error> { + pub async fn replay(&mut self, resume_at: Sequence) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select @@ -292,12 +292,13 @@ impl<'c> Messages<'c> { message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", message.body as "body: Body", - deleted.deleted_at as "deleted_at: DateTime", - deleted.deleted_sequence as "deleted_sequence: Sequence" + deleted.deleted_at as "deleted_at?: DateTime", + deleted.deleted_sequence as "deleted_sequence?: Sequence" from message left join message_deleted as deleted using (id) - where coalesce(message.sent_sequence > $1, true) + where message.sent_sequence > $1 + or deleted.deleted_sequence > $1 "#, resume_at, ) diff --git a/src/test/fixtures/boot.rs b/src/test/fixtures/boot.rs new file mode 100644 index 0000000..120726f --- /dev/null +++ b/src/test/fixtures/boot.rs @@ -0,0 +1,9 @@ +use crate::{app::App, event::Sequence}; + +pub async fn resume_point(app: &App) -> Sequence { + app.boot() + .snapshot() + .await + .expect("boot always succeeds") + .resume_point +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 2b7b6af..470b31a 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -2,6 +2,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; +pub mod boot; pub mod channel; pub mod cookie; pub mod event; -- cgit v1.2.3