From 067e3da1900d052a416c56e1c047640aa23441ae Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Fri, 13 Sep 2024 00:26:03 -0400 Subject: Transmit messages via `/:chan/send` and `/:chan/events`. --- ...11a01d9a530898094b2cb7a1fa03ff2393e044cb1d.json | 12 -- ...9e775a7f9dbeabf8094a1e781803f34a128af29075.json | 44 +++++++ ...205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json | 44 +++++++ ...a1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19.json | 20 +++ Cargo.lock | 55 +++++++- Cargo.toml | 6 +- migrations/20240912145249_message.sql | 17 +++ src/app.rs | 16 ++- src/channel/app.rs | 141 ++++++++++++++++++++- src/channel/repo.rs | 86 ------------- src/channel/repo/channels.rs | 87 +++++++++++++ src/channel/repo/messages.rs | 111 ++++++++++++++++ src/channel/repo/mod.rs | 2 + src/channel/routes.rs | 56 +++++++- src/cli.rs | 3 +- src/id.rs | 2 +- src/index/app.rs | 2 +- src/index/templates.rs | 2 +- src/login/repo/logins.rs | 2 +- 19 files changed, 586 insertions(+), 122 deletions(-) delete mode 100644 .sqlx/query-2722ea4a4c5134c209771211a01d9a530898094b2cb7a1fa03ff2393e044cb1d.json create mode 100644 .sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json create mode 100644 .sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json create mode 100644 .sqlx/query-d50791669f27ddafe83adfa1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19.json create mode 100644 migrations/20240912145249_message.sql delete mode 100644 src/channel/repo.rs create mode 100644 src/channel/repo/channels.rs create mode 100644 src/channel/repo/messages.rs create mode 100644 src/channel/repo/mod.rs diff --git a/.sqlx/query-2722ea4a4c5134c209771211a01d9a530898094b2cb7a1fa03ff2393e044cb1d.json b/.sqlx/query-2722ea4a4c5134c209771211a01d9a530898094b2cb7a1fa03ff2393e044cb1d.json deleted file mode 100644 index 704e480..0000000 --- a/.sqlx/query-2722ea4a4c5134c209771211a01d9a530898094b2cb7a1fa03ff2393e044cb1d.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert\n into channel (id, name)\n values ($1, $2)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - }, - "hash": "2722ea4a4c5134c209771211a01d9a530898094b2cb7a1fa03ff2393e044cb1d" -} diff --git a/.sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json b/.sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json new file mode 100644 index 0000000..f7e5590 --- /dev/null +++ b/.sqlx/query-2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n select\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n from message\n where channel = $1\n order by sent_at asc\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "channel: ChannelId", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2fc4e7c9085fbd3c42a0d19e775a7f9dbeabf8094a1e781803f34a128af29075" +} diff --git a/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json b/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json new file mode 100644 index 0000000..93bbe5e --- /dev/null +++ b/.sqlx/query-9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n insert into message\n (id, sender, channel, body, sent_at)\n values ($1, $2, $3, $4, $5)\n returning\n id as \"id: Id\",\n sender as \"sender: LoginId\",\n channel as \"channel: ChannelId\",\n body,\n sent_at as \"sent_at: DateTime\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sender: LoginId", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "channel: ChannelId", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 5 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "9d7f46ceefa71aad1aec42205a7b6d6aecbc52c56a7a8ad38f96cb82ebcbd53e" +} diff --git a/.sqlx/query-d50791669f27ddafe83adfa1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19.json b/.sqlx/query-d50791669f27ddafe83adfa1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19.json new file mode 100644 index 0000000..de6ab44 --- /dev/null +++ b/.sqlx/query-d50791669f27ddafe83adfa1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n insert\n into channel (id, name)\n values ($1, $2)\n returning id as \"id: Id\"\n ", + "describe": { + "columns": [ + { + "name": "id: Id", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "d50791669f27ddafe83adfa1bb3b1887fe08a02a9d647f24e1ed59d9cf922a19" +} diff --git a/Cargo.lock b/Cargo.lock index 687863e..f48e75d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,6 +318,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.52.6", ] @@ -566,6 +567,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -610,6 +626,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -628,8 +655,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -712,13 +741,17 @@ dependencies = [ "axum-extra", "chrono", "clap", + "futures", "maud", "password-hash", "rand", "rand_core", "serde", + "serde_json", "sqlx", + "thiserror", "tokio", + "tokio-stream", "uuid", ] @@ -1407,9 +1440,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.127" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "itoa", "memchr", @@ -1886,13 +1919,27 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index eb748ba..e69de87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,17 @@ argon2 = "0.5.3" async-trait = "0.1.82" axum = { version = "0.7.5", features = ["form"] } axum-extra = { version = "0.9.3", features = ["cookie"] } -chrono = "0.4.38" +chrono = { version = "0.4.38", features = ["serde"] } clap = { version = "4.5.16", features = ["derive", "env"] } +futures = "0.3.30" maud = { version = "0.26.0", features = ["axum"] } password-hash = { version = "0.5.0", features = ["std"] } rand = "0.8.5" rand_core = { version = "0.6.4", features = ["getrandom"] } serde = { version = "1.0.209", features = ["derive"] } +serde_json = "1.0.128" sqlx = { version = "0.8.1", features = ["chrono", "runtime-tokio", "sqlite"] } +thiserror = "1.0.63" tokio = { version = "1.40.0", features = ["rt", "macros", "rt-multi-thread"] } +tokio-stream = { version = "0.1.16", features = ["sync"] } uuid = { version = "1.10.0", features = ["v4"] } diff --git a/migrations/20240912145249_message.sql b/migrations/20240912145249_message.sql new file mode 100644 index 0000000..ce9db0d --- /dev/null +++ b/migrations/20240912145249_message.sql @@ -0,0 +1,17 @@ +create table message ( + id text + not null + primary key, + channel text + not null + references channel (id), + sender text + not null + references login (id), + body text + not null, + sent_at text + not null +); + +create index message_sent_at on message (channel, sent_at); diff --git a/src/app.rs b/src/app.rs index 4195fdc..f349fd4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,15 +1,23 @@ use sqlx::sqlite::SqlitePool; -use crate::{channel::app::Channels, index::app::Index, login::app::Logins}; +use crate::error::BoxedError; + +use crate::{ + channel::app::{Broadcaster, Channels}, + index::app::Index, + login::app::Logins, +}; #[derive(Clone)] pub struct App { db: SqlitePool, + broadcaster: Broadcaster, } impl App { - pub fn from(db: SqlitePool) -> Self { - Self { db } + pub async fn from(db: SqlitePool) -> Result { + let broadcaster = Broadcaster::from_database(&db).await?; + Ok(Self { db, broadcaster }) } } @@ -23,6 +31,6 @@ impl App { } pub fn channels(&self) -> Channels { - Channels::new(&self.db) + Channels::new(&self.db, &self.broadcaster) } } diff --git a/src/channel/app.rs b/src/channel/app.rs index 84822cb..7b02300 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,21 +1,152 @@ +use std::collections::{hash_map::Entry, HashMap}; +use std::sync::{Arc, Mutex}; + +use futures::{ + stream::{self, StreamExt as _, TryStreamExt as _}, + Stream, +}; use sqlx::sqlite::SqlitePool; +use tokio::sync::broadcast::{channel, Sender}; +use tokio_stream::wrappers::BroadcastStream; -use super::repo::Provider as _; -use crate::error::BoxedError; +use super::repo::{ + channels::{Id as ChannelId, Provider as _}, + messages::{Message, Provider as _}, +}; +use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, + broadcaster: &'a Broadcaster, } impl<'a> Channels<'a> { - pub fn new(db: &'a SqlitePool) -> Self { - Self { db } + pub fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self { + Self { db, broadcaster } } pub async fn create(&self, name: &str) -> Result<(), BoxedError> { let mut tx = self.db.begin().await?; - tx.channels().create(name).await?; + let channel = tx.channels().create(name).await?; + tx.commit().await?; + + self.broadcaster.register_channel(&channel)?; + Ok(()) + } + + pub async fn send( + &self, + login: &Login, + channel: &ChannelId, + body: &str, + sent_at: &DateTime, + ) -> Result<(), BoxedError> { + let mut tx = self.db.begin().await?; + let message = tx + .messages() + .create(&login.id, channel, body, sent_at) + .await?; + tx.commit().await?; + + self.broadcaster.broadcast(channel, message)?; + Ok(()) + } + + pub async fn events( + &self, + channel: &ChannelId, + ) -> Result>, BoxedError> { + let live_messages = self.broadcaster.listen(channel)?.map_err(BoxedError::from); + + let mut tx = self.db.begin().await?; + let stored_messages = tx.messages().all(channel).await?; + tx.commit().await?; + + let stored_messages = stream::iter(stored_messages.into_iter().map(Ok)); + + Ok(stored_messages.chain(live_messages)) + } +} + +// Clones will share the same senders collection. +#[derive(Clone)] +pub struct Broadcaster { + // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's + // own advice: . Methods that + // lock it must be sync. + senders: Arc>>>, +} + +impl Broadcaster { + pub async fn from_database(db: &SqlitePool) -> Result { + let mut tx = db.begin().await?; + let channels = tx.channels().all().await?; tx.commit().await?; + + let channels = channels.iter().map(|c| &c.id); + let broadcaster = Broadcaster::new(channels); + Ok(broadcaster) + } + + fn new<'i>(channels: impl IntoIterator) -> Self { + let senders: HashMap<_, _> = channels + .into_iter() + .cloned() + .map(|id| (id, Self::make_sender())) + .collect(); + + Self { + senders: Arc::new(Mutex::new(senders)), + } + } + + pub fn register_channel(&self, channel: &ChannelId) -> Result<(), RegisterError> { + match self.senders.lock().unwrap().entry(channel.clone()) { + Entry::Occupied(_) => Err(RegisterError::Duplicate), + vacant => { + vacant.or_insert_with(Self::make_sender); + Ok(()) + } + } + } + + pub fn broadcast(&self, channel: &ChannelId, message: Message) -> Result<(), BroadcastError> { + let lock = self.senders.lock().unwrap(); + let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; + + // Per the Tokio docs, the returned error is only used to indicate that + // there are no receivers. In this use case, that's fine; a lack of + // listening consumers (chat clients) when a message is sent isn't an + // error. + let _ = tx.send(message); Ok(()) } + + pub fn listen(&self, channel: &ChannelId) -> Result, BroadcastError> { + let lock = self.senders.lock().unwrap(); + let tx = lock.get(channel).ok_or(BroadcastError::Unregistered)?; + let rx = tx.subscribe(); + let stream = BroadcastStream::from(rx); + + Ok(stream) + } + + fn make_sender() -> Sender { + // Queue depth of 16 chosen entirely arbitrarily. Don't read too much + // into it. + let (tx, _) = channel(16); + tx + } +} + +#[derive(Debug, thiserror::Error)] +pub enum RegisterError { + #[error("duplicate channel registered")] + Duplicate, +} + +#[derive(Debug, thiserror::Error)] +pub enum BroadcastError { + #[error("requested channel not registered")] + Unregistered, } diff --git a/src/channel/repo.rs b/src/channel/repo.rs deleted file mode 100644 index a04cac5..0000000 --- a/src/channel/repo.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::fmt; - -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::error::BoxedError; -use crate::id::Id as BaseId; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -#[derive(Debug)] -pub struct Channel { - pub id: Id, - pub name: String, -} - -impl<'c> Channels<'c> { - /// Create a new channel. - pub async fn create(&mut self, name: &str) -> Result<(), BoxedError> { - let id = Id::generate(); - - sqlx::query!( - r#" - insert - into channel (id, name) - values ($1, $2) - "#, - id, - name, - ) - .execute(&mut *self.0) - .await?; - - Ok(()) - } - - pub async fn all(&mut self) -> Result, BoxedError> { - let channels = sqlx::query_as!( - Channel, - r#" - select - channel.id as "id: Id", - channel.name - from channel - order by channel.name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } -} - -/// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive(Debug, sqlx::Type, serde::Deserialize)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("C") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/repo/channels.rs b/src/channel/repo/channels.rs new file mode 100644 index 0000000..6fb0c23 --- /dev/null +++ b/src/channel/repo/channels.rs @@ -0,0 +1,87 @@ +use std::fmt; + +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::error::BoxedError; +use crate::id::Id as BaseId; + +pub trait Provider { + fn channels(&mut self) -> Channels; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn channels(&mut self) -> Channels { + Channels(self) + } +} + +pub struct Channels<'t>(&'t mut SqliteConnection); + +#[derive(Debug)] +pub struct Channel { + pub id: Id, + pub name: String, +} + +impl<'c> Channels<'c> { + /// Create a new channel. + pub async fn create(&mut self, name: &str) -> Result { + let id = Id::generate(); + + let channel = sqlx::query_scalar!( + r#" + insert + into channel (id, name) + values ($1, $2) + returning id as "id: Id" + "#, + id, + name, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn all(&mut self) -> Result, BoxedError> { + let channels = sqlx::query_as!( + Channel, + r#" + select + channel.id as "id: Id", + channel.name + from channel + order by channel.name + "#, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } +} + +/// Stable identifier for a [Channel]. Prefixed with `C`. +#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("C") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs new file mode 100644 index 0000000..bdb0d29 --- /dev/null +++ b/src/channel/repo/messages.rs @@ -0,0 +1,111 @@ +use std::fmt; + +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use super::channels::Id as ChannelId; +use crate::{ + clock::DateTime, error::BoxedError, id::Id as BaseId, login::repo::logins::Id as LoginId, +}; + +pub trait Provider { + fn messages(&mut self) -> Messages; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn messages(&mut self) -> Messages { + Messages(self) + } +} + +pub struct Messages<'t>(&'t mut SqliteConnection); + +#[derive(Clone, Debug, serde::Serialize)] +pub struct Message { + pub id: Id, + pub sender: LoginId, + pub channel: ChannelId, + pub body: String, + pub sent_at: DateTime, +} + +impl<'c> Messages<'c> { + pub async fn create( + &mut self, + sender: &LoginId, + channel: &ChannelId, + body: &str, + sent_at: &DateTime, + ) -> Result { + let id = Id::generate(); + + let message = sqlx::query_as!( + Message, + r#" + insert into message + (id, sender, channel, body, sent_at) + values ($1, $2, $3, $4, $5) + returning + id as "id: Id", + sender as "sender: LoginId", + channel as "channel: ChannelId", + body, + sent_at as "sent_at: DateTime" + "#, + id, + sender, + channel, + body, + sent_at, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn all(&mut self, channel: &ChannelId) -> Result, BoxedError> { + let messages = sqlx::query_as!( + Message, + r#" + select + id as "id: Id", + sender as "sender: LoginId", + channel as "channel: ChannelId", + body, + sent_at as "sent_at: DateTime" + from message + where channel = $1 + order by sent_at asc + "#, + channel, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} + +/// Stable identifier for a [Message]. Prefixed with `M`. +#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("M") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/channel/repo/mod.rs b/src/channel/repo/mod.rs new file mode 100644 index 0000000..345897d --- /dev/null +++ b/src/channel/repo/mod.rs @@ -0,0 +1,2 @@ +pub mod channels; +pub mod messages; diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 864f1b3..83c733c 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -1,14 +1,23 @@ use axum::{ - extract::{Form, State}, - response::{IntoResponse, Redirect}, - routing::post, + extract::{Form, Path, State}, + http::StatusCode, + response::{ + sse::{self, Sse}, + IntoResponse, Redirect, + }, + routing::{get, post}, Router, }; +use futures::stream::{StreamExt as _, TryStreamExt as _}; -use crate::{app::App, error::InternalError, login::repo::logins::Login}; +use super::repo::channels::Id as ChannelId; +use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; pub fn router() -> Router { - Router::new().route("/create", post(on_create)) + Router::new() + .route("/create", post(on_create)) + .route("/:channel/send", post(on_send)) + .route("/:channel/events", get(on_events)) } #[derive(serde::Deserialize)] @@ -25,3 +34,40 @@ async fn on_create( Ok(Redirect::to("/")) } + +#[derive(serde::Deserialize)] +struct SendRequest { + message: String, +} + +async fn on_send( + Path(channel): Path, + RequestedAt(sent_at): RequestedAt, + State(app): State, + login: Login, + Form(form): Form, +) -> Result { + app.channels() + .send(&login, &channel, &form.message, &sent_at) + .await?; + + Ok(StatusCode::ACCEPTED) +} + +async fn on_events( + Path(channel): Path, + State(app): State, + _: Login, // requires auth, but doesn't actually care who you are +) -> Result { + let stream = app + .channels() + .events(&channel) + .await? + .map(|msg| match msg { + Ok(msg) => Ok(serde_json::to_string(&msg)?), + Err(err) => Err(err), + }) + .map_ok(|msg| sse::Event::default().data(&msg)); + + Ok(Sse::new(stream).keep_alive(sse::KeepAlive::default())) +} diff --git a/src/cli.rs b/src/cli.rs index e374834..fa7c499 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -28,9 +28,10 @@ impl Args { sqlx::migrate!().run(&pool).await?; + let app = App::from(pool).await?; let app = routers() .route_layer(middleware::from_fn(clock::middleware)) - .with_state(App::from(pool)); + .with_state(app); let listener = self.listener().await?; let started_msg = started_msg(&listener)?; diff --git a/src/id.rs b/src/id.rs index 4e12f2a..c69b341 100644 --- a/src/id.rs +++ b/src/id.rs @@ -27,7 +27,7 @@ pub const ID_SIZE: usize = 15; // // By convention, the prefix should be UPPERCASE - note that the alphabet for this // is entirely lowercase. -#[derive(Debug, Hash, PartialEq, Eq, sqlx::Type, serde::Deserialize)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, sqlx::Type, serde::Deserialize, serde::Serialize)] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(String); diff --git a/src/index/app.rs b/src/index/app.rs index 79f5a9a..d6eef18 100644 --- a/src/index/app.rs +++ b/src/index/app.rs @@ -1,7 +1,7 @@ use sqlx::sqlite::SqlitePool; use crate::{ - channel::repo::{Channel, Provider as _}, + channel::repo::channels::{Channel, Provider as _}, error::BoxedError, }; diff --git a/src/index/templates.rs b/src/index/templates.rs index fdb750b..38cd93f 100644 --- a/src/index/templates.rs +++ b/src/index/templates.rs @@ -1,6 +1,6 @@ use maud::{html, Markup, DOCTYPE}; -use crate::{channel::repo::Channel, login::repo::logins::Login}; +use crate::{channel::repo::channels::Channel, login::repo::logins::Login}; pub fn authenticated<'c>(login: Login, channels: impl IntoIterator) -> Markup { html! { diff --git a/src/login/repo/logins.rs b/src/login/repo/logins.rs index 26a5b09..142d8fb 100644 --- a/src/login/repo/logins.rs +++ b/src/login/repo/logins.rs @@ -90,7 +90,7 @@ impl<'c> Logins<'c> { } /// Stable identifier for a [Login]. Prefixed with `L`. -#[derive(Debug, sqlx::Type)] +#[derive(Clone, Debug, sqlx::Type, serde::Serialize)] #[sqlx(transparent)] pub struct Id(BaseId); -- cgit v1.2.3