diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-09-15 23:50:41 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-09-16 11:03:22 -0400 |
| commit | 491cb3eb34d20140aed80dbb9edc39c4db5335d2 (patch) | |
| tree | e1e2e009f064dc6dfc8c98d2bf97d8d1f7b45615 /src/channel | |
| parent | 99b33023332393e46f5a661901b980b78e6fb133 (diff) | |
Consolidate most repository types into a repo module.
Having them contained in the individual endpoint groups conveyed an unintended sense that their intended scope was _only_ that endpoint group. It also made most repo-related import paths _quite_ long. This splits up the repos as follows:
* "General applicability" repos - those that are only loosely connected to a single task, and are likely to be shared between tasks - go in crate::repo.
* Specialized repos - those tightly connected to a specific task - go in the module for that task, under crate::PATH::repo.
In both cases, each repo goes in its own submodule, to make it easier to use the module name as a namespace.
Which category a repo goes in is a judgment call. `crate::channel::repo::broadcast` (formerly `channel::repo::messages`) is used outside of `crate::channel`, for example, but its main purpose is to support channel message broadcasts. It could arguably live under `crate::event::repo::channel`, but the resulting namespace is less legible to me.
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 40 | ||||
| -rw-r--r-- | src/channel/repo/broadcast.rs | 112 | ||||
| -rw-r--r-- | src/channel/repo/channels.rs | 102 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 136 | ||||
| -rw-r--r-- | src/channel/repo/mod.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes.rs | 10 |
6 files changed, 142 insertions, 261 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 36fa552..b0b63b3 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -10,11 +10,15 @@ use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; use tokio_stream::wrappers::BroadcastStream; -use super::repo::{ - channels::{Channel, Id as ChannelId, Provider as _}, - messages::{BroadcastMessage, Provider as _}, +use super::repo::broadcast::{self, Provider as _}; +use crate::{ + clock::DateTime, + error::BoxedError, + repo::{ + channel::{self, Channel, Provider as _}, + login::Login, + }, }; -use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -46,13 +50,13 @@ impl<'a> Channels<'a> { pub async fn send( &self, login: &Login, - channel: &ChannelId, + channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result<(), BoxedError> { let mut tx = self.db.begin().await?; let message = tx - .messages() + .broadcast() .create(&login.id, channel, body, sent_at) .await?; tx.commit().await?; @@ -63,13 +67,13 @@ impl<'a> Channels<'a> { pub async fn events( &self, - channel: &ChannelId, + channel: &channel::Id, resume_at: Option<&DateTime>, - ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError> + ) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, BoxedError> { fn skip_stale<E>( resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> { let resume_at = resume_at.cloned(); move |msg| { future::ready(Ok(match resume_at { @@ -86,7 +90,7 @@ impl<'a> Channels<'a> { .try_skip_while(skip_stale(resume_at)); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().for_replay(channel, resume_at).await?; + let stored_messages = tx.broadcast().replay(channel, resume_at).await?; tx.commit().await?; let stored_messages = stream::iter(stored_messages).map(Ok); @@ -101,7 +105,7 @@ pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that // lock it must be sync. - senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>, + senders: Arc<Mutex<HashMap<channel::Id, Sender<broadcast::Message>>>>, } impl Broadcaster { @@ -115,7 +119,7 @@ impl Broadcaster { Ok(broadcaster) } - fn new<'i>(channels: impl IntoIterator<Item = &'i ChannelId>) -> Self { + fn new<'i>(channels: impl IntoIterator<Item = &'i channel::Id>) -> Self { let senders: HashMap<_, _> = channels .into_iter() .cloned() @@ -128,7 +132,7 @@ impl Broadcaster { } // panic: if ``channel`` is already registered. - pub fn register_channel(&self, channel: &ChannelId) { + pub fn register_channel(&self, channel: &channel::Id) { match self.senders().entry(channel.clone()) { // This ever happening indicates a serious logic error. Entry::Occupied(_) => panic!("duplicate channel registration for channel {channel}"), @@ -140,7 +144,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) { + pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -152,7 +156,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> { + pub fn listen(&self, channel: &channel::Id) -> BroadcastStream<broadcast::Message> { let rx = self.sender(channel).subscribe(); BroadcastStream::from(rx) @@ -160,15 +164,15 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> { + fn sender(&self, channel: &channel::Id) -> Sender<broadcast::Message> { self.senders()[channel].clone() } - fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> { + fn senders(&self) -> MutexGuard<HashMap<channel::Id, Sender<broadcast::Message>>> { self.senders.lock().unwrap() // propagate panics when mutex is poisoned } - fn make_sender() -> Sender<BroadcastMessage> { + fn make_sender() -> Sender<broadcast::Message> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs new file mode 100644 index 0000000..3ca7396 --- /dev/null +++ b/src/channel/repo/broadcast.rs @@ -0,0 +1,112 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + repo::{ + channel, + login::{self, Login, Logins}, + message, + }, +}; + +pub trait Provider { + fn broadcast(&mut self) -> Broadcast; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn broadcast(&mut self) -> Broadcast { + Broadcast(self) + } +} + +pub struct Broadcast<'t>(&'t mut SqliteConnection); + +#[derive(Clone, Debug, serde::Serialize)] +pub struct Message { + pub id: message::Id, + pub sender: Login, + pub body: String, + pub sent_at: DateTime, +} + +impl<'c> Broadcast<'c> { + pub async fn create( + &mut self, + sender: &login::Id, + channel: &channel::Id, + body: &str, + sent_at: &DateTime, + ) -> Result<Message, sqlx::Error> { + let id = message::Id::generate(); + + let sender = Logins::from(&mut *self.0).by_id(sender).await?; + + let message = sqlx::query!( + r#" + insert into message + (id, sender, channel, body, sent_at) + values ($1, $2, $3, $4, $5) + returning + id as "id: message::Id", + sender as "sender: login::Id", + body, + sent_at as "sent_at: DateTime" + "#, + id, + sender.id, + channel, + body, + sent_at, + ) + .map(|row| { + debug_assert!(row.sender == sender.id); + Message { + id: row.id, + sender: sender.clone(), + body: row.body, + sent_at: row.sent_at, + } + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn replay( + &mut self, + channel: &channel::Id, + resume_at: Option<&DateTime>, + ) -> Result<Vec<Message>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + message.id as "id: message::Id", + login.id as "sender_id: login::Id", + login.name as sender_name, + message.body, + message.sent_at as "sent_at: DateTime" + from message + join login on message.sender = login.id + where channel = $1 + and coalesce(sent_at > $2, true) + order by sent_at asc + "#, + channel, + resume_at, + ) + .map(|row| Message { + id: row.id, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + body: row.body, + sent_at: row.sent_at, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} diff --git a/src/channel/repo/channels.rs b/src/channel/repo/channels.rs deleted file mode 100644 index ab7489c..0000000 --- a/src/channel/repo/channels.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::fmt; - -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::id::Id as BaseId; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -#[derive(Debug)] -pub struct Channel { - pub id: Id, - pub name: String, -} - -impl<'c> Channels<'c> { - /// Create a new channel. - pub async fn create(&mut self, name: &str) -> Result<Id, sqlx::Error> { - let id = Id::generate(); - - let channel = sqlx::query_scalar!( - r#" - insert - into channel (id, name) - values ($1, $2) - returning id as "id: Id" - "#, - id, - name, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: Id) -> Result<Channel, sqlx::Error> { - let channel = sqlx::query_as!( - Channel, - r#" - select id as "id: Id", name - from channel - where id = $1 - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn all(&mut self) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - channel.id as "id: Id", - channel.name - from channel - order by channel.name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } -} - -/// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From<BaseId> for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("C") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs deleted file mode 100644 index a30e6da..0000000 --- a/src/channel/repo/messages.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::fmt; - -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use super::channels::Id as ChannelId; -use crate::{ - clock::DateTime, - id::Id as BaseId, - login::repo::logins::{Id as LoginId, Login, Logins}, -}; - -pub trait Provider { - fn messages(&mut self) -> Messages; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn messages(&mut self) -> Messages { - Messages(self) - } -} - -pub struct Messages<'t>(&'t mut SqliteConnection); - -#[derive(Clone, Debug, serde::Serialize)] -pub struct BroadcastMessage { - pub id: Id, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl<'c> Messages<'c> { - pub async fn create( - &mut self, - sender: &LoginId, - channel: &ChannelId, - body: &str, - sent_at: &DateTime, - ) -> Result<BroadcastMessage, sqlx::Error> { - let id = Id::generate(); - - let sender = Logins::from(&mut *self.0).by_id(sender).await?; - - let message = sqlx::query!( - r#" - insert into message - (id, sender, channel, body, sent_at) - values ($1, $2, $3, $4, $5) - returning - id as "id: Id", - sender as "sender: LoginId", - body, - sent_at as "sent_at: DateTime" - "#, - id, - sender.id, - channel, - body, - sent_at, - ) - .map(|row| { - debug_assert!(row.sender == sender.id); - BroadcastMessage { - id: row.id, - sender: sender.clone(), - body: row.body, - sent_at: row.sent_at, - } - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn for_replay( - &mut self, - channel: &ChannelId, - resume_at: Option<&DateTime>, - ) -> Result<Vec<BroadcastMessage>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - message.id as "id: Id", - login.id as "sender_id: LoginId", - login.name as sender_name, - message.body, - message.sent_at as "sent_at: DateTime" - from message - join login on message.sender = login.id - where channel = $1 - and coalesce(sent_at > $2, true) - order by sent_at asc - "#, - channel, - resume_at, - ) - .map(|row| BroadcastMessage { - id: row.id, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - body: row.body, - sent_at: row.sent_at, - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } -} - -/// Stable identifier for a [Message]. Prefixed with `M`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From<BaseId> for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("M") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/repo/mod.rs b/src/channel/repo/mod.rs index 345897d..2ed3062 100644 --- a/src/channel/repo/mod.rs +++ b/src/channel/repo/mod.rs @@ -1,2 +1 @@ -pub mod channels; -pub mod messages; +pub mod broadcast; diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 3c2353b..1379153 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -5,8 +5,12 @@ use axum::{ Router, }; -use super::repo::channels::Id as ChannelId; -use crate::{app::App, clock::RequestedAt, error::InternalError, login::repo::logins::Login}; +use crate::{ + app::App, + clock::RequestedAt, + error::InternalError, + repo::{channel, login::Login}, +}; pub fn router() -> Router<App> { Router::new() @@ -35,7 +39,7 @@ struct SendRequest { } async fn on_send( - Path(channel): Path<ChannelId>, + Path(channel): Path<channel::Id>, RequestedAt(sent_at): RequestedAt, State(app): State<App>, login: Login, |
