summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-15 23:50:41 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-16 11:03:22 -0400
commit491cb3eb34d20140aed80dbb9edc39c4db5335d2 (patch)
treee1e2e009f064dc6dfc8c98d2bf97d8d1f7b45615 /src/channel/app.rs
parent99b33023332393e46f5a661901b980b78e6fb133 (diff)
Consolidate most repository types into a repo module.
Having them contained in the individual endpoint groups conveyed an unintended sense that their intended scope was _only_ that endpoint group. It also made most repo-related import paths _quite_ long. This splits up the repos as follows: * "General applicability" repos - those that are only loosely connected to a single task, and are likely to be shared between tasks - go in crate::repo. * Specialized repos - those tightly connected to a specific task - go in the module for that task, under crate::PATH::repo. In both cases, each repo goes in its own submodule, to make it easier to use the module name as a namespace. Which category a repo goes in is a judgment call. `crate::channel::repo::broadcast` (formerly `channel::repo::messages`) is used outside of `crate::channel`, for example, but its main purpose is to support channel message broadcasts. It could arguably live under `crate::event::repo::channel`, but the resulting namespace is less legible to me.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs40
1 files changed, 22 insertions, 18 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 36fa552..b0b63b3 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -10,11 +10,15 @@ use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
use tokio_stream::wrappers::BroadcastStream;
-use super::repo::{
- channels::{Channel, Id as ChannelId, Provider as _},
- messages::{BroadcastMessage, Provider as _},
+use super::repo::broadcast::{self, Provider as _};
+use crate::{
+ clock::DateTime,
+ error::BoxedError,
+ repo::{
+ channel::{self, Channel, Provider as _},
+ login::Login,
+ },
};
-use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login};
pub struct Channels<'a> {
db: &'a SqlitePool,
@@ -46,13 +50,13 @@ impl<'a> Channels<'a> {
pub async fn send(
&self,
login: &Login,
- channel: &ChannelId,
+ channel: &channel::Id,
body: &str,
sent_at: &DateTime,
) -> Result<(), BoxedError> {
let mut tx = self.db.begin().await?;
let message = tx
- .messages()
+ .broadcast()
.create(&login.id, channel, body, sent_at)
.await?;
tx.commit().await?;
@@ -63,13 +67,13 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
- channel: &ChannelId,
+ channel: &channel::Id,
resume_at: Option<&DateTime>,
- ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError>
+ ) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, BoxedError>
{
fn skip_stale<E>(
resume_at: Option<&DateTime>,
- ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> {
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> {
let resume_at = resume_at.cloned();
move |msg| {
future::ready(Ok(match resume_at {
@@ -86,7 +90,7 @@ impl<'a> Channels<'a> {
.try_skip_while(skip_stale(resume_at));
let mut tx = self.db.begin().await?;
- let stored_messages = tx.messages().for_replay(channel, resume_at).await?;
+ let stored_messages = tx.broadcast().replay(channel, resume_at).await?;
tx.commit().await?;
let stored_messages = stream::iter(stored_messages).map(Ok);
@@ -101,7 +105,7 @@ pub struct Broadcaster {
// The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's
// own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that
// lock it must be sync.
- senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>,
+ senders: Arc<Mutex<HashMap<channel::Id, Sender<broadcast::Message>>>>,
}
impl Broadcaster {
@@ -115,7 +119,7 @@ impl Broadcaster {
Ok(broadcaster)
}
- fn new<'i>(channels: impl IntoIterator<Item = &'i ChannelId>) -> Self {
+ fn new<'i>(channels: impl IntoIterator<Item = &'i channel::Id>) -> Self {
let senders: HashMap<_, _> = channels
.into_iter()
.cloned()
@@ -128,7 +132,7 @@ impl Broadcaster {
}
// panic: if ``channel`` is already registered.
- pub fn register_channel(&self, channel: &ChannelId) {
+ pub fn register_channel(&self, channel: &channel::Id) {
match self.senders().entry(channel.clone()) {
// This ever happening indicates a serious logic error.
Entry::Occupied(_) => panic!("duplicate channel registration for channel {channel}"),
@@ -140,7 +144,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) {
+ pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) {
let tx = self.sender(channel);
// Per the Tokio docs, the returned error is only used to indicate that
@@ -152,7 +156,7 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> {
+ pub fn listen(&self, channel: &channel::Id) -> BroadcastStream<broadcast::Message> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
@@ -160,15 +164,15 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> {
+ fn sender(&self, channel: &channel::Id) -> Sender<broadcast::Message> {
self.senders()[channel].clone()
}
- fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> {
+ fn senders(&self) -> MutexGuard<HashMap<channel::Id, Sender<broadcast::Message>>> {
self.senders.lock().unwrap() // propagate panics when mutex is poisoned
}
- fn make_sender() -> Sender<BroadcastMessage> {
+ fn make_sender() -> Sender<broadcast::Message> {
// Queue depth of 16 chosen entirely arbitrarily. Don't read too much
// into it.
let (tx, _) = channel(16);