diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/broadcast.rs | 22 | ||||
| -rw-r--r-- | src/conversation/app.rs | 17 | ||||
| -rw-r--r-- | src/login/app.rs | 7 | ||||
| -rw-r--r-- | src/message/app.rs | 17 | ||||
| -rw-r--r-- | src/token/app.rs | 7 | ||||
| -rw-r--r-- | src/user/create.rs | 2 |
6 files changed, 39 insertions, 33 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..ee42c08 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -30,7 +30,7 @@ impl<M> Broadcaster<M> where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into<M>) { + pub fn broadcast(&self, message: M) { let tx = self.sender(); // Per the Tokio docs, the returned error is only used to indicate that @@ -40,7 +40,25 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = tx.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec<event::Event>` + // for consumption by outside clients. + pub fn broadcast_from<I, E>(&self, messages: I) + where + I: IntoIterator, + M: FromIterator<E>, + E: From<I::Item>, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> { diff --git a/src/conversation/app.rs b/src/conversation/app.rs index 5e07292..26886af 100644 --- a/src/conversation/app.rs +++ b/src/conversation/app.rs @@ -10,7 +10,7 @@ use super::{ use crate::{ clock::DateTime, db::{Duplicate as _, NotFound as _}, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, message::repo::Provider as _, name::{self, Name}, }; @@ -49,8 +49,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(conversation.as_created()) } @@ -105,8 +104,7 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(()) } @@ -134,13 +132,8 @@ impl<'a> Conversations<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/login/app.rs b/src/login/app.rs index 77d4ac3..e471000 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -80,9 +80,10 @@ impl<'a> Logins<'a> { tx.tokens().create(&token, &secret).await?; tx.commit().await?; - for event in revoked.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + revoked + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(secret) } else { diff --git a/src/message/app.rs b/src/message/app.rs index f0a62d0..647152e 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, conversation::{self, repo::Provider as _}, db::NotFound as _, - event::{Broadcaster, Event, Sequence, repo::Provider as _}, + event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, name, user::{self, repo::Provider as _}, @@ -62,8 +62,7 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(message.as_sent()) } @@ -92,8 +91,7 @@ impl<'a> Messages<'a> { tx.messages().record_events(events.clone()).await?; tx.commit().await?; - self.events - .broadcast(events.map(Event::from).collect::<Vec<_>>()); + self.events.broadcast_from(events); Ok(()) } else { @@ -126,13 +124,8 @@ impl<'a> Messages<'a> { tx.commit().await?; - self.events.broadcast( - events - .into_iter() - .kmerge_by(Sequence::merge) - .map(Event::from) - .collect::<Vec<_>>(), - ); + self.events + .broadcast_from(events.into_iter().kmerge_by(Sequence::merge)); Ok(()) } diff --git a/src/token/app.rs b/src/token/app.rs index fb5d712..1d68f32 100644 --- a/src/token/app.rs +++ b/src/token/app.rs @@ -102,9 +102,10 @@ impl<'a> Tokens<'a> { let tokens = tx.tokens().expire(&expire_at).await?; tx.commit().await?; - for event in tokens.into_iter().map(TokenEvent::Revoked) { - self.token_events.broadcast(event); - } + tokens + .into_iter() + .map(TokenEvent::Revoked) + .for_each(|event| self.token_events.broadcast(event)); Ok(()) } diff --git a/src/user/create.rs b/src/user/create.rs index 21c61d1..d6656e5 100644 --- a/src/user/create.rs +++ b/src/user/create.rs @@ -99,7 +99,7 @@ where login: _, } = self; - events.broadcast(user_events.into_iter().collect::<Vec<_>>()); + events.broadcast_from(user_events); } pub fn login(&self) -> &Login { |
