diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 36fa552..b0b63b3 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -10,11 +10,15 @@ use sqlx::sqlite::SqlitePool; use tokio::sync::broadcast::{channel, Sender}; use tokio_stream::wrappers::BroadcastStream; -use super::repo::{ - channels::{Channel, Id as ChannelId, Provider as _}, - messages::{BroadcastMessage, Provider as _}, +use super::repo::broadcast::{self, Provider as _}; +use crate::{ + clock::DateTime, + error::BoxedError, + repo::{ + channel::{self, Channel, Provider as _}, + login::Login, + }, }; -use crate::{clock::DateTime, error::BoxedError, login::repo::logins::Login}; pub struct Channels<'a> { db: &'a SqlitePool, @@ -46,13 +50,13 @@ impl<'a> Channels<'a> { pub async fn send( &self, login: &Login, - channel: &ChannelId, + channel: &channel::Id, body: &str, sent_at: &DateTime, ) -> Result<(), BoxedError> { let mut tx = self.db.begin().await?; let message = tx - .messages() + .broadcast() .create(&login.id, channel, body, sent_at) .await?; tx.commit().await?; @@ -63,13 +67,13 @@ impl<'a> Channels<'a> { pub async fn events( &self, - channel: &ChannelId, + channel: &channel::Id, resume_at: Option<&DateTime>, - ) -> Result<impl Stream<Item = Result<BroadcastMessage, BoxedError>> + 'static, BoxedError> + ) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, BoxedError> { fn skip_stale<E>( resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m BroadcastMessage) -> future::Ready<Result<bool, E>> { + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> { let resume_at = resume_at.cloned(); move |msg| { future::ready(Ok(match resume_at { @@ -86,7 +90,7 @@ impl<'a> Channels<'a> { .try_skip_while(skip_stale(resume_at)); let mut tx = self.db.begin().await?; - let stored_messages = tx.messages().for_replay(channel, resume_at).await?; + let stored_messages = tx.broadcast().replay(channel, resume_at).await?; tx.commit().await?; let stored_messages = stream::iter(stored_messages).map(Ok); @@ -101,7 +105,7 @@ pub struct Broadcaster { // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that // lock it must be sync. - senders: Arc<Mutex<HashMap<ChannelId, Sender<BroadcastMessage>>>>, + senders: Arc<Mutex<HashMap<channel::Id, Sender<broadcast::Message>>>>, } impl Broadcaster { @@ -115,7 +119,7 @@ impl Broadcaster { Ok(broadcaster) } - fn new<'i>(channels: impl IntoIterator<Item = &'i ChannelId>) -> Self { + fn new<'i>(channels: impl IntoIterator<Item = &'i channel::Id>) -> Self { let senders: HashMap<_, _> = channels .into_iter() .cloned() @@ -128,7 +132,7 @@ impl Broadcaster { } // panic: if ``channel`` is already registered. - pub fn register_channel(&self, channel: &ChannelId) { + pub fn register_channel(&self, channel: &channel::Id) { match self.senders().entry(channel.clone()) { // This ever happening indicates a serious logic error. Entry::Occupied(_) => panic!("duplicate channel registration for channel {channel}"), @@ -140,7 +144,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn broadcast(&self, channel: &ChannelId, message: BroadcastMessage) { + pub fn broadcast(&self, channel: &channel::Id, message: broadcast::Message) { let tx = self.sender(channel); // Per the Tokio docs, the returned error is only used to indicate that @@ -152,7 +156,7 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - pub fn listen(&self, channel: &ChannelId) -> BroadcastStream<BroadcastMessage> { + pub fn listen(&self, channel: &channel::Id) -> BroadcastStream<broadcast::Message> { let rx = self.sender(channel).subscribe(); BroadcastStream::from(rx) @@ -160,15 +164,15 @@ impl Broadcaster { // panic: if ``channel`` has not been previously registered, and was not // part of the initial set of channels. - fn sender(&self, channel: &ChannelId) -> Sender<BroadcastMessage> { + fn sender(&self, channel: &channel::Id) -> Sender<broadcast::Message> { self.senders()[channel].clone() } - fn senders(&self) -> MutexGuard<HashMap<ChannelId, Sender<BroadcastMessage>>> { + fn senders(&self) -> MutexGuard<HashMap<channel::Id, Sender<broadcast::Message>>> { self.senders.lock().unwrap() // propagate panics when mutex is poisoned } - fn make_sender() -> Sender<BroadcastMessage> { + fn make_sender() -> Sender<broadcast::Message> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. let (tx, _) = channel(16); |
