summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/broadcast.rs22
-rw-r--r--src/conversation/app.rs17
-rw-r--r--src/login/app.rs7
-rw-r--r--src/message/app.rs17
-rw-r--r--src/token/app.rs7
-rw-r--r--src/user/create.rs2
6 files changed, 39 insertions, 33 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs
index 6e1f04d..ee42c08 100644
--- a/src/broadcast.rs
+++ b/src/broadcast.rs
@@ -30,7 +30,7 @@ impl<M> Broadcaster<M>
where
M: Clone + Send + std::fmt::Debug + 'static,
{
- pub fn broadcast(&self, message: impl Into<M>) {
+ pub fn broadcast(&self, message: M) {
let tx = self.sender();
// Per the Tokio docs, the returned error is only used to indicate that
@@ -40,7 +40,25 @@ where
//
// The successful return value, which includes the number of active
// receivers, also isn't that interesting to us.
- let _ = tx.send(message.into());
+ let _ = tx.send(message);
+ }
+
+ // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an
+ // iterable of items that can be collected into `M`, then this will construct an `M` from the
+ // passed event iterator, converting each element as it goes. This emits one message (as `M`),
+ // containing whatever we collect out of `messages`.
+ //
+ // This is mostly meant for handling synchronized entity events, which tend to be generated as
+ // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec<event::Event>`
+ // for consumption by outside clients.
+ pub fn broadcast_from<I, E>(&self, messages: I)
+ where
+ I: IntoIterator,
+ M: FromIterator<E>,
+ E: From<I::Item>,
+ {
+ let message = messages.into_iter().map(Into::into).collect();
+ self.broadcast(message);
}
pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> {
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 5e07292..26886af 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -10,7 +10,7 @@ use super::{
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
message::repo::Provider as _,
name::{self, Name},
};
@@ -49,8 +49,7 @@ impl<'a> Conversations<'a> {
tx.commit().await?;
- self.events
- .broadcast(events.map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(conversation.as_created())
}
@@ -105,8 +104,7 @@ impl<'a> Conversations<'a> {
tx.commit().await?;
- self.events
- .broadcast(events.map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(())
}
@@ -134,13 +132,8 @@ impl<'a> Conversations<'a> {
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
diff --git a/src/login/app.rs b/src/login/app.rs
index 77d4ac3..e471000 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -80,9 +80,10 @@ impl<'a> Logins<'a> {
tx.tokens().create(&token, &secret).await?;
tx.commit().await?;
- for event in revoked.into_iter().map(TokenEvent::Revoked) {
- self.token_events.broadcast(event);
- }
+ revoked
+ .into_iter()
+ .map(TokenEvent::Revoked)
+ .for_each(|event| self.token_events.broadcast(event));
Ok(secret)
} else {
diff --git a/src/message/app.rs b/src/message/app.rs
index f0a62d0..647152e 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -7,7 +7,7 @@ use crate::{
clock::DateTime,
conversation::{self, repo::Provider as _},
db::NotFound as _,
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
+ event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
name,
user::{self, repo::Provider as _},
@@ -62,8 +62,7 @@ impl<'a> Messages<'a> {
tx.commit().await?;
- self.events
- .broadcast(events.map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(message.as_sent())
}
@@ -92,8 +91,7 @@ impl<'a> Messages<'a> {
tx.messages().record_events(events.clone()).await?;
tx.commit().await?;
- self.events
- .broadcast(events.map(Event::from).collect::<Vec<_>>());
+ self.events.broadcast_from(events);
Ok(())
} else {
@@ -126,13 +124,8 @@ impl<'a> Messages<'a> {
tx.commit().await?;
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
+ self.events
+ .broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
diff --git a/src/token/app.rs b/src/token/app.rs
index fb5d712..1d68f32 100644
--- a/src/token/app.rs
+++ b/src/token/app.rs
@@ -102,9 +102,10 @@ impl<'a> Tokens<'a> {
let tokens = tx.tokens().expire(&expire_at).await?;
tx.commit().await?;
- for event in tokens.into_iter().map(TokenEvent::Revoked) {
- self.token_events.broadcast(event);
- }
+ tokens
+ .into_iter()
+ .map(TokenEvent::Revoked)
+ .for_each(|event| self.token_events.broadcast(event));
Ok(())
}
diff --git a/src/user/create.rs b/src/user/create.rs
index 21c61d1..d6656e5 100644
--- a/src/user/create.rs
+++ b/src/user/create.rs
@@ -99,7 +99,7 @@ where
login: _,
} = self;
- events.broadcast(user_events.into_iter().collect::<Vec<_>>());
+ events.broadcast_from(user_events);
}
pub fn login(&self) -> &Login {