diff options
Diffstat (limited to 'src/channel/app.rs')
| -rw-r--r-- | src/channel/app.rs | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index f9a75d7..48e3e3c 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -56,7 +56,7 @@ impl<'a> Channels<'a> { channel: &channel::Id, body: &str, sent_at: &DateTime, - ) -> Result<(), EventsError> { + ) -> Result<broadcast::Message, EventsError> { let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -69,34 +69,39 @@ impl<'a> Channels<'a> { .await?; tx.commit().await?; - self.broadcaster.broadcast(&channel.id, message); - Ok(()) + self.broadcaster.broadcast(&channel.id, &message); + Ok(message) } pub async fn events( &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&DateTime>, + resume_at: Option<&str>, ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let mut tx = self - .db - .begin() + let resume_at = resume_at + .map(chrono::DateTime::parse_from_rfc3339) + .transpose()? + .map(|resume_at| resume_at.to_utc()); + + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let channel = tx.channels().by_id(channel).await?; let live_messages = self .broadcaster .listen(&channel.id) - .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_stale(resume_at.as_ref())) .filter(Self::skip_expired(&expire_at)); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; tx.commit().await?; let stored_messages = stream::iter(stored_messages); @@ -154,5 +159,7 @@ pub enum EventsError { #[error("channel {0} not found")] ChannelNotFound(channel::Id), #[error(transparent)] + ResumeAtError(#[from] chrono::ParseError), + #[error(transparent)] DatabaseError(#[from] sqlx::Error), } |
