summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs27
1 files changed, 17 insertions, 10 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index f9a75d7..48e3e3c 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -56,7 +56,7 @@ impl<'a> Channels<'a> {
channel: &channel::Id,
body: &str,
sent_at: &DateTime,
- ) -> Result<(), EventsError> {
+ ) -> Result<broadcast::Message, EventsError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
@@ -69,34 +69,39 @@ impl<'a> Channels<'a> {
.await?;
tx.commit().await?;
- self.broadcaster.broadcast(&channel.id, message);
- Ok(())
+ self.broadcaster.broadcast(&channel.id, &message);
+ Ok(message)
}
pub async fn events(
&self,
channel: &channel::Id,
subscribed_at: &DateTime,
- resume_at: Option<&DateTime>,
+ resume_at: Option<&str>,
) -> Result<impl Stream<Item = broadcast::Message>, EventsError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
- let mut tx = self
- .db
- .begin()
+ let resume_at = resume_at
+ .map(chrono::DateTime::parse_from_rfc3339)
+ .transpose()?
+ .map(|resume_at| resume_at.to_utc());
+
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let channel = tx.channels().by_id(channel).await?;
let live_messages = self
.broadcaster
.listen(&channel.id)
- .filter(Self::skip_stale(resume_at))
+ .filter(Self::skip_stale(resume_at.as_ref()))
.filter(Self::skip_expired(&expire_at));
tx.broadcast().expire(&expire_at).await?;
- let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
+ let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?;
tx.commit().await?;
let stored_messages = stream::iter(stored_messages);
@@ -154,5 +159,7 @@ pub enum EventsError {
#[error("channel {0} not found")]
ChannelNotFound(channel::Id),
#[error(transparent)]
+ ResumeAtError(#[from] chrono::ParseError),
+ #[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}