From 357116366c1307bedaac6a3dfe9c5ed8e0e0c210 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 00:41:25 -0400 Subject: First pass on reorganizing the backend. This is primarily renames and repackagings. --- src/cli.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/cli.rs') diff --git a/src/cli.rs b/src/cli.rs index 132baf8..ee95ea6 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, events, expire, login, repo::pool}; +use crate::{app::App, channel, clock, event, expire, login, repo::pool}; /// Command-line entry point for running the `hi` server. /// @@ -105,7 +105,7 @@ impl Args { } fn routers() -> Router { - [channel::router(), events::router(), login::router()] + [channel::router(), event::router(), login::router()] .into_iter() .fold(Router::default(), Router::merge) } -- cgit v1.2.3 From 6f07e6869bbf62903ac83c9bc061e7bde997e6a8 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Wed, 2 Oct 2024 01:10:09 -0400 Subject: Retire top-level `repo`. This helped me discover an organizational scheme I like more. --- src/channel/app.rs | 6 +- src/channel/mod.rs | 1 + src/channel/repo.rs | 165 +++++++++++++++++++++++++++++++++++++++++++++ src/cli.rs | 4 +- src/db.rs | 42 ++++++++++++ src/event/app.rs | 6 +- src/event/repo/mod.rs | 3 + src/event/repo/sequence.rs | 44 ++++++++++++ src/lib.rs | 2 +- src/login/app.rs | 2 +- src/repo/channel.rs | 165 --------------------------------------------- src/repo/error.rs | 23 ------- src/repo/mod.rs | 4 -- src/repo/pool.rs | 18 ----- src/repo/sequence.rs | 44 ------------ src/test/fixtures/mod.rs | 4 +- src/token/app.rs | 2 +- 17 files changed, 267 insertions(+), 268 deletions(-) create mode 100644 src/channel/repo.rs create mode 100644 src/db.rs create mode 100644 src/event/repo/sequence.rs delete mode 100644 src/repo/channel.rs delete mode 100644 src/repo/error.rs delete mode 100644 src/repo/mod.rs delete mode 100644 src/repo/pool.rs delete mode 100644 src/repo/sequence.rs (limited to 'src/cli.rs') diff --git a/src/channel/app.rs b/src/channel/app.rs index 1422651..ef0a63f 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,11 +2,9 @@ use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ - channel::Channel, + channel::{repo::Provider as _, Channel}, clock::DateTime, - event::Sequence, - event::{broadcaster::Broadcaster, types::ChannelEvent}, - repo::{channel::Provider as _, sequence::Provider as _}, + event::{broadcaster::Broadcaster, repo::Provider as _, types::ChannelEvent, Sequence}, }; pub struct Channels<'a> { diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 02d0ed4..2672084 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -2,6 +2,7 @@ use crate::{clock::DateTime, event::Sequence}; pub mod app; mod id; +pub mod repo; mod routes; pub use self::{id::Id, routes::router}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs new file mode 100644 index 0000000..18cd81f --- /dev/null +++ b/src/channel/repo.rs @@ -0,0 +1,165 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + channel::{Channel, Id}, + clock::DateTime, + event::{types, Sequence}, +}; + +pub trait Provider { + fn channels(&mut self) -> Channels; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn channels(&mut self) -> Channels { + Channels(self) + } +} + +pub struct Channels<'t>(&'t mut SqliteConnection); + +impl<'c> Channels<'c> { + pub async fn create( + &mut self, + name: &str, + created_at: &DateTime, + created_sequence: Sequence, + ) -> Result { + let id = Id::generate(); + let channel = sqlx::query_as!( + Channel, + r#" + insert + into channel (id, name, created_at, created_sequence) + values ($1, $2, $3, $4) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + "#, + id, + name, + created_at, + created_sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn by_id(&mut self, channel: &Id) -> Result { + let channel = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where id = $1 + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn all( + &mut self, + resume_point: Option, + ) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence <= $1, true) + order by channel.name + "#, + resume_point, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn replay( + &mut self, + resume_at: Option, + ) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence > $1, true) + "#, + resume_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn delete( + &mut self, + channel: &Channel, + deleted_at: &DateTime, + deleted_sequence: Sequence, + ) -> Result { + let channel = channel.id.clone(); + sqlx::query_scalar!( + r#" + delete from channel + where id = $1 + returning 1 as "row: i64" + "#, + channel, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(types::ChannelEvent { + sequence: deleted_sequence, + at: *deleted_at, + data: types::DeletedEvent { channel }.into(), + }) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } +} diff --git a/src/cli.rs b/src/cli.rs index ee95ea6..893fae2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, event, expire, login, repo::pool}; +use crate::{app::App, channel, clock, db, event, expire, login}; /// Command-line entry point for running the `hi` server. /// @@ -100,7 +100,7 @@ impl Args { } async fn pool(&self) -> sqlx::Result { - pool::prepare(&self.database_url).await + db::prepare(&self.database_url).await } } diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..93a1169 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,42 @@ +use std::str::FromStr; + +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; + +pub async fn prepare(url: &str) -> sqlx::Result { + let pool = create(url).await?; + sqlx::migrate!().run(&pool).await?; + Ok(pool) +} + +async fn create(database_url: &str) -> sqlx::Result { + let options = SqliteConnectOptions::from_str(database_url)? + .create_if_missing(true) + .optimize_on_close(true, /* analysis_limit */ None); + + let pool = SqlitePoolOptions::new().connect_with(options).await?; + Ok(pool) +} + +pub trait NotFound { + type Ok; + fn not_found(self, map: F) -> Result + where + E: From, + F: FnOnce() -> E; +} + +impl NotFound for Result { + type Ok = T; + + fn not_found(self, map: F) -> Result + where + E: From, + F: FnOnce() -> E, + { + match self { + Err(sqlx::Error::RowNotFound) => Err(map()), + Err(other) => Err(other.into()), + Ok(value) => Ok(value), + } + } +} diff --git a/src/event/app.rs b/src/event/app.rs index b5f2ecc..3d35f1a 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -12,11 +12,11 @@ use super::{ types::{self, ChannelEvent}, }; use crate::{ - channel, + channel::{self, repo::Provider as _}, clock::DateTime, - event::Sequence, + db::NotFound as _, + event::{repo::Provider as _, Sequence}, login::Login, - repo::{channel::Provider as _, error::NotFound as _, sequence::Provider as _}, }; pub struct Events<'a> { diff --git a/src/event/repo/mod.rs b/src/event/repo/mod.rs index e216a50..cee840c 100644 --- a/src/event/repo/mod.rs +++ b/src/event/repo/mod.rs @@ -1 +1,4 @@ pub mod message; +mod sequence; + +pub use self::sequence::Provider; diff --git a/src/event/repo/sequence.rs b/src/event/repo/sequence.rs new file mode 100644 index 0000000..c985869 --- /dev/null +++ b/src/event/repo/sequence.rs @@ -0,0 +1,44 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::event::Sequence; + +pub trait Provider { + fn sequence(&mut self) -> Sequences; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn sequence(&mut self) -> Sequences { + Sequences(self) + } +} + +pub struct Sequences<'t>(&'t mut SqliteConnection); + +impl<'c> Sequences<'c> { + pub async fn next(&mut self) -> Result { + let next = sqlx::query_scalar!( + r#" + update event_sequence + set last_value = last_value + 1 + returning last_value as "next_value: Sequence" + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(next) + } + + pub async fn current(&mut self) -> Result { + let next = sqlx::query_scalar!( + r#" + select last_value as "last_value: Sequence" + from event_sequence + "#, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(next) + } +} diff --git a/src/lib.rs b/src/lib.rs index bbcb314..8ec13da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,13 +7,13 @@ mod broadcast; mod channel; pub mod cli; mod clock; +mod db; mod error; mod event; mod expire; mod id; mod login; mod message; -mod repo; #[cfg(test)] mod test; mod token; diff --git a/src/login/app.rs b/src/login/app.rs index 69c1055..15adb31 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,6 +1,6 @@ use sqlx::sqlite::SqlitePool; -use crate::{event::Sequence, repo::sequence::Provider as _}; +use crate::event::{repo::Provider as _, Sequence}; #[cfg(test)] use super::{repo::Provider as _, Login, Password}; diff --git a/src/repo/channel.rs b/src/repo/channel.rs deleted file mode 100644 index 18cd81f..0000000 --- a/src/repo/channel.rs +++ /dev/null @@ -1,165 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::{ - channel::{Channel, Id}, - clock::DateTime, - event::{types, Sequence}, -}; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -impl<'c> Channels<'c> { - pub async fn create( - &mut self, - name: &str, - created_at: &DateTime, - created_sequence: Sequence, - ) -> Result { - let id = Id::generate(); - let channel = sqlx::query_as!( - Channel, - r#" - insert - into channel (id, name, created_at, created_sequence) - values ($1, $2, $3, $4) - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - "#, - id, - name, - created_at, - created_sequence, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: &Id) -> Result { - let channel = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where id = $1 - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn all( - &mut self, - resume_point: Option, - ) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where coalesce(created_sequence <= $1, true) - order by channel.name - "#, - resume_point, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } - - pub async fn replay( - &mut self, - resume_at: Option, - ) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" - from channel - where coalesce(created_sequence > $1, true) - "#, - resume_at, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } - - pub async fn delete( - &mut self, - channel: &Channel, - deleted_at: &DateTime, - deleted_sequence: Sequence, - ) -> Result { - let channel = channel.id.clone(); - sqlx::query_scalar!( - r#" - delete from channel - where id = $1 - returning 1 as "row: i64" - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(types::ChannelEvent { - sequence: deleted_sequence, - at: *deleted_at, - data: types::DeletedEvent { channel }.into(), - }) - } - - pub async fn expired(&mut self, expired_at: &DateTime) -> Result, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - channel.id as "id: Id", - channel.name, - channel.created_at as "created_at: DateTime", - channel.created_sequence as "created_sequence: Sequence" - from channel - left join message - where created_at < $1 - and message.id is null - "#, - expired_at, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } -} diff --git a/src/repo/error.rs b/src/repo/error.rs deleted file mode 100644 index a5961e2..0000000 --- a/src/repo/error.rs +++ /dev/null @@ -1,23 +0,0 @@ -pub trait NotFound { - type Ok; - fn not_found(self, map: F) -> Result - where - E: From, - F: FnOnce() -> E; -} - -impl NotFound for Result { - type Ok = T; - - fn not_found(self, map: F) -> Result - where - E: From, - F: FnOnce() -> E, - { - match self { - Err(sqlx::Error::RowNotFound) => Err(map()), - Err(other) => Err(other.into()), - Ok(value) => Ok(value), - } - } -} diff --git a/src/repo/mod.rs b/src/repo/mod.rs deleted file mode 100644 index 7abd46b..0000000 --- a/src/repo/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod channel; -pub mod error; -pub mod pool; -pub mod sequence; diff --git a/src/repo/pool.rs b/src/repo/pool.rs deleted file mode 100644 index b4aa6fc..0000000 --- a/src/repo/pool.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::str::FromStr; - -use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; - -pub async fn prepare(url: &str) -> sqlx::Result { - let pool = create(url).await?; - sqlx::migrate!().run(&pool).await?; - Ok(pool) -} - -async fn create(database_url: &str) -> sqlx::Result { - let options = SqliteConnectOptions::from_str(database_url)? - .create_if_missing(true) - .optimize_on_close(true, /* analysis_limit */ None); - - let pool = SqlitePoolOptions::new().connect_with(options).await?; - Ok(pool) -} diff --git a/src/repo/sequence.rs b/src/repo/sequence.rs deleted file mode 100644 index c985869..0000000 --- a/src/repo/sequence.rs +++ /dev/null @@ -1,44 +0,0 @@ -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::event::Sequence; - -pub trait Provider { - fn sequence(&mut self) -> Sequences; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn sequence(&mut self) -> Sequences { - Sequences(self) - } -} - -pub struct Sequences<'t>(&'t mut SqliteConnection); - -impl<'c> Sequences<'c> { - pub async fn next(&mut self) -> Result { - let next = sqlx::query_scalar!( - r#" - update event_sequence - set last_value = last_value + 1 - returning last_value as "next_value: Sequence" - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } - - pub async fn current(&mut self) -> Result { - let next = sqlx::query_scalar!( - r#" - select last_value as "last_value: Sequence" - from event_sequence - "#, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(next) - } -} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index d1dd0c3..76467ab 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -1,6 +1,6 @@ use chrono::{TimeDelta, Utc}; -use crate::{app::App, clock::RequestedAt, repo::pool}; +use crate::{app::App, clock::RequestedAt, db}; pub mod channel; pub mod filter; @@ -10,7 +10,7 @@ pub mod login; pub mod message; pub async fn scratch_app() -> App { - let pool = pool::prepare("sqlite::memory:") + let pool = db::prepare("sqlite::memory:") .await .expect("setting up in-memory sqlite database"); App::from(pool) diff --git a/src/token/app.rs b/src/token/app.rs index 1477a9f..030ec69 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -11,8 +11,8 @@ use super::{ }; use crate::{ clock::DateTime, + db::NotFound as _, login::{repo::Provider as _, Login, Password}, - repo::error::NotFound as _, }; pub struct Tokens<'a> { -- cgit v1.2.3 From 0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Thu, 3 Oct 2024 20:17:07 -0400 Subject: Add endpoints for deleting channels and messages. It is deliberate that the expire() functions do not use them. To avoid races, the transactions must be committed before events get sent, in both cases, which makes them structurally pretty different. --- ...4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json | 20 ++++++ ...0b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json | 32 ---------- ...9cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json | 62 +++++++++++++++++++ ...99e837106c799e84015425286b79f42e4001d8a4c7.json | 62 ------------------- ...ad2d2dec42949522f182a61bfb249f13ee78564179.json | 20 ++++++ docs/api.md | 28 +++++++++ src/channel/app.rs | 72 ++++++++++++++++++---- src/channel/routes.rs | 61 ++++++++++++------ src/channel/routes/test/on_send.rs | 6 +- src/cli.rs | 13 ++-- src/event/app.rs | 1 + src/event/broadcaster.rs | 2 +- src/message/app.rs | 61 +++++++++++++----- src/message/mod.rs | 3 +- src/message/repo.rs | 46 +++++++------- src/message/routes.rs | 46 ++++++++++++++ 16 files changed, 363 insertions(+), 172 deletions(-) create mode 100644 .sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json delete mode 100644 .sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json create mode 100644 .sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json delete mode 100644 .sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json create mode 100644 .sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json create mode 100644 src/message/routes.rs (limited to 'src/cli.rs') diff --git a/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json new file mode 100644 index 0000000..ee0f235 --- /dev/null +++ b/.sqlx/query-46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n message.id as \"id: Id\"\n from message\n join channel on message.channel = channel.id\n where channel.id = $1\n order by message.sent_sequence\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "46403b84bfc79a53aec36b4a808afb115f6e47d545dfbeb18f9c54e6eb15eb80" +} diff --git a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json b/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json deleted file mode 100644 index fb5f94b..0000000 --- a/.sqlx/query-4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n message.id as \"message: Id\"\n from message\n join channel on message.channel = channel.id\n where sent_at < $1\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "message: Id", - "ordinal": 2, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "4d4dce1b034f4a540f49490b1a8433a8ca334f1d666b104823e3fb0c08efb2cc" -} diff --git a/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json new file mode 100644 index 0000000..257e1f6 --- /dev/null +++ b/.sqlx/query-6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5.json @@ -0,0 +1,62 @@ +{ + "db_name": "SQLite", + "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n ", + "describe": { + "columns": [ + { + "name": "channel_id: channel::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "channel_name", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "id: Id", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "sent_sequence: Sequence", + "ordinal": 7, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "6fc4be85527af518da17c49cedc6bee1750d28a6176980ed7040b8a3301fc7e5" +} diff --git a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json b/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json deleted file mode 100644 index 288a657..0000000 --- a/.sqlx/query-e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n sender.id as \"sender_id: login::Id\",\n sender.name as \"sender_name\",\n message.id as \"id: Id\",\n message.body,\n sent_at as \"sent_at: DateTime\",\n sent_sequence as \"sent_sequence: Sequence\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where message.id = $1\n and message.channel = $2\n ", - "describe": { - "columns": [ - { - "name": "channel_id: channel::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "channel_name", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender_id: login::Id", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "id: Id", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 6, - "type_info": "Text" - }, - { - "name": "sent_sequence: Sequence", - "ordinal": 7, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "e93702ad922c7ce802499e99e837106c799e84015425286b79f42e4001d8a4c7" -} diff --git a/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json new file mode 100644 index 0000000..92a64a3 --- /dev/null +++ b/.sqlx/query-f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"message: Id\"\n from message\n where sent_at < $1\n ", + "describe": { + "columns": [ + { + "name": "message: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "f3a338b9e4a65856decd79ad2d2dec42949522f182a61bfb249f13ee78564179" +} diff --git a/docs/api.md b/docs/api.md index 5adf28d..ef211bc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -151,6 +151,34 @@ Once the message is accepted, this will return a 202 Accepted response. The mess If the channel ID is not valid, this will return a 404 Not Found response. +### `DELETE /api/channels/:channel` + +Deletes a channel (and all messages in it). + +The `:channel` placeholder must be a channel ID, as returned by `GET /api/channels` or `POST /api/channels`. + +#### On success + +This will return a 202 Accepted response on success, and delete the channel. + +#### Invalid channel ID + +If the channel ID is not valid, this will return a 404 Not Found response. + +### `DELETE /api/messages/:message` + +Deletes a message. + +The `:message` placeholder must be a message ID, as returned from the event stream or from a list of messages. + +#### On success + +This will return a 202 Accepted response on success, and delete the message. + +#### Invalid message ID + +If the message ID is not valid, this will return a 404 Not Found response. + ### `GET /api/events` Subscribes to events. This endpoint returns an `application/event-stream` response, and is intended for use with the `EventSource` browser API. Events will be delivered on this stream as they occur, and the request will remain open to deliver events. diff --git a/src/channel/app.rs b/src/channel/app.rs index 6ce826b..24be2ff 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -2,10 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ - channel::{repo::Provider as _, Channel}, clock::DateTime, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, + message::repo::Provider as _, }; pub struct Channels<'a> { @@ -28,9 +30,8 @@ impl<'a> Channels<'a> { .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; - for event in channel.events() { - self.events.broadcast(event); - } + self.events + .broadcast(channel.events().map(Event::from).collect::>()); Ok(channel.snapshot()) } @@ -53,6 +54,46 @@ impl<'a> Channels<'a> { Ok(channels) } + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| DeleteError::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel).await?; + for message in messages { + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -73,12 +114,13 @@ impl<'a> Channels<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.sequence() < b.sequence()) + .map(Event::from) + .collect::>(), + ); Ok(()) } @@ -92,6 +134,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 5bb1ee9..bce634e 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,18 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; use axum_extra::extract::Query; -use super::app; +use super::{ + app::{self, DeleteError}, + Channel, Id, +}; use crate::{ - app::App, - channel::{self, Channel}, - clock::RequestedAt, - error::Internal, - event::Sequence, - login::Login, - message::app::Error as MessageError, + app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, + message::app::SendError, }; #[cfg(test)] @@ -26,6 +24,7 @@ pub fn router() -> Router { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) } #[derive(Default, serde::Deserialize)] @@ -95,28 +94,54 @@ struct SendRequest { async fn on_send( State(app): State, - Path(channel): Path, + Path(channel): Path, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json, -) -> Result { +) -> Result { app.messages() .send(&channel, &login, &sent_at, &request.message) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + .await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(MessageError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State, + Path(channel): Path, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result { + app.channels().delete(&channel, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct DeleteErrorResponse(#[from] DeleteError); -impl IntoResponse for ErrorResponse { +impl IntoResponse for DeleteErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ MessageError::ChannelNotFound(_) => { + not_found @ DeleteError::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 1027b29..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -5,7 +5,7 @@ use crate::{ channel, channel::routes, event, - message::app, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -77,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -91,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::Error::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } diff --git a/src/cli.rs b/src/cli.rs index 893fae2..2d9f512 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use clap::Parser; use sqlx::sqlite::SqlitePool; use tokio::net; -use crate::{app::App, channel, clock, db, event, expire, login}; +use crate::{app::App, channel, clock, db, event, expire, login, message}; /// Command-line entry point for running the `hi` server. /// @@ -105,9 +105,14 @@ impl Args { } fn routers() -> Router { - [channel::router(), event::router(), login::router()] - .into_iter() - .fold(Router::default(), Router::merge) + [ + channel::router(), + event::router(), + login::router(), + message::router(), + ] + .into_iter() + .fold(Router::default(), Router::merge) } fn started_msg(listener: &net::TcpListener) -> io::Result { diff --git a/src/event/app.rs b/src/event/app.rs index e58bea9..32f0a97 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -61,6 +61,7 @@ impl<'a> Events<'a> { // Filtering on the broadcast resume point filters out messages // before resume_at, and filters out messages duplicated from // `replay_events`. + .flat_map(stream::iter) .filter(Self::resume(resume_live_at)); Ok(replay.chain(live_messages)) diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs index de2513a..3c4efac 100644 --- a/src/event/broadcaster.rs +++ b/src/event/broadcaster.rs @@ -1,3 +1,3 @@ use crate::broadcast; -pub type Broadcaster = broadcast::Broadcaster; +pub type Broadcaster = broadcast::Broadcaster>; diff --git a/src/message/app.rs b/src/message/app.rs index 51f772e..1d34c14 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -2,12 +2,12 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; -use super::{repo::Provider as _, Message}; +use super::{repo::Provider as _, Id, Message}; use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, login::Login, }; @@ -27,13 +27,13 @@ impl<'a> Messages<'a> { sender: &Login, sent_at: &DateTime, body: &str, - ) -> Result { + ) -> Result { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| Error::ChannelNotFound(channel.clone()))?; + .not_found(|| SendError::ChannelNotFound(channel.clone()))?; let sent = tx.sequence().next(sent_at).await?; let message = tx .messages() @@ -41,24 +41,40 @@ impl<'a> Messages<'a> { .await?; tx.commit().await?; - for event in message.events() { - self.events.broadcast(event); - } + self.events + .broadcast(message.events().map(Event::from).collect::>()); Ok(message.snapshot()) } + pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + let mut tx = self.db.begin().await?; + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(message, &deleted).await?; + tx.commit().await?; + + self.events.broadcast( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from) + .collect::>(), + ); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); let mut tx = self.db.begin().await?; - let expired = tx.messages().expired(&expire_at).await?; + let expired = tx.messages().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); - for (channel, message) in expired { + for message in expired { let deleted = tx.sequence().next(relative_to).await?; - let message = tx.messages().delete(&channel, &message, &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.push( message .events() @@ -68,21 +84,32 @@ impl<'a> Messages<'a> { tx.commit().await?; - for event in events - .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) - { - self.events.broadcast(event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .map(Event::from) + .collect::>(), + ); Ok(()) } } #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum SendError { + #[error("channel {0} not found")] + ChannelNotFound(channel::Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum DeleteError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), + #[error("message {0} not found")] + NotFound(Id), #[error(transparent)] DatabaseError(#[from] sqlx::Error), } diff --git a/src/message/mod.rs b/src/message/mod.rs index 52d56c1..a8f51ab 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -3,6 +3,7 @@ pub mod event; mod history; mod id; pub mod repo; +mod routes; mod snapshot; -pub use self::{event::Event, history::History, id::Id, snapshot::Message}; +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message}; diff --git a/src/message/repo.rs b/src/message/repo.rs index 3b2b8f7..ae41736 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -62,7 +62,25 @@ impl<'c> Messages<'c> { Ok(message) } - async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result { + pub async fn in_channel(&mut self, channel: &Channel) -> Result, sqlx::Error> { + let messages = sqlx::query_scalar!( + r#" + select + message.id as "id: Id" + from message + join channel on message.channel = channel.id + where channel.id = $1 + order by message.sent_sequence + "#, + channel.id, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + + async fn by_id(&mut self, message: &Id) -> Result { let message = sqlx::query!( r#" select @@ -78,10 +96,8 @@ impl<'c> Messages<'c> { join channel on message.channel = channel.id join login as sender on message.sender = sender.id where message.id = $1 - and message.channel = $2 "#, message, - channel.id, ) .map(|row| History { message: Message { @@ -110,11 +126,10 @@ impl<'c> Messages<'c> { pub async fn delete( &mut self, - channel: &Channel, message: &Id, deleted: &Instant, ) -> Result { - let history = self.by_id(channel, message).await?; + let history = self.by_id(message).await?; sqlx::query_scalar!( r#" @@ -134,31 +149,16 @@ impl<'c> Messages<'c> { }) } - pub async fn expired( - &mut self, - expire_at: &DateTime, - ) -> Result, sqlx::Error> { - let messages = sqlx::query!( + pub async fn expired(&mut self, expire_at: &DateTime) -> Result, sqlx::Error> { + let messages = sqlx::query_scalar!( r#" select - channel.id as "channel_id: channel::Id", - channel.name as "channel_name", - message.id as "message: Id" + id as "message: Id" from message - join channel on message.channel = channel.id where sent_at < $1 "#, expire_at, ) - .map(|row| { - ( - Channel { - id: row.channel_id, - name: row.channel_name, - }, - row.message, - ) - }) .fetch_all(&mut *self.0) .await?; diff --git a/src/message/routes.rs b/src/message/routes.rs new file mode 100644 index 0000000..29fe3d7 --- /dev/null +++ b/src/message/routes.rs @@ -0,0 +1,46 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::delete, + Router, +}; + +use crate::{ + app::App, + clock::RequestedAt, + error::Internal, + login::Login, + message::{self, app::DeleteError}, +}; + +pub fn router() -> Router { + Router::new().route("/api/messages/:message", delete(on_delete)) +} + +async fn on_delete( + State(app): State, + Path(message): Path, + RequestedAt(deleted_at): RequestedAt, + _: Login, +) -> Result { + app.messages().delete(&message, &deleted_at).await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] DeleteError); + +impl IntoResponse for ErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} -- cgit v1.2.3