summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/events')
-rw-r--r--src/events/app.rs40
-rw-r--r--src/events/expire.rs18
-rw-r--r--src/events/mod.rs1
-rw-r--r--src/events/repo/message.rs70
-rw-r--r--src/events/routes.rs5
-rw-r--r--src/events/routes/test.rs91
-rw-r--r--src/events/types.rs12
7 files changed, 146 insertions, 91 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 134e86a..03f3ee6 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -55,14 +55,35 @@ impl<'a> Events<'a> {
Ok(event)
}
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.message_events().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for (channel, message) in expired {
+ let event = tx
+ .message_events()
+ .delete_expired(&channel, &message, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
+
pub async fn subscribe(
&self,
- subscribed_at: &DateTime,
resume_at: ResumePoint,
) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
-
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
@@ -81,8 +102,6 @@ impl<'a> Events<'a> {
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
- tx.message_events().expire(&expire_at).await?;
-
let mut replays = BTreeMap::new();
let mut resume_live_at = resume_at.clone();
for channel in channels {
@@ -107,9 +126,6 @@ impl<'a> Events<'a> {
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let live_messages = live_messages
- // Sure, it's temporally improbable that we'll ever skip a message
- // that's 90 days old, but there's no reason not to be thorough.
- .filter(Self::skip_expired(&expire_at))
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
@@ -134,12 +150,6 @@ impl<'a> Events<'a> {
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
move |event| future::ready(resume_at.not_after(event))
}
- fn skip_expired(
- expire_at: &DateTime,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- let expire_at = expire_at.to_owned();
- move |event| future::ready(expire_at < event.at)
- }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/events/expire.rs b/src/events/expire.rs
new file mode 100644
index 0000000..d92142d
--- /dev/null
+++ b/src/events/expire.rs
@@ -0,0 +1,18 @@
+use axum::{
+ extract::{Request, State},
+ middleware::Next,
+ response::Response,
+};
+
+use crate::{app::App, clock::RequestedAt, error::Internal};
+
+// Expires messages and channels before each request.
+pub async fn middleware(
+ State(app): State<App>,
+ RequestedAt(expired_at): RequestedAt,
+ req: Request,
+ next: Next,
+) -> Result<Response, Internal> {
+ app.events().expire(&expired_at).await?;
+ Ok(next.run(req).await)
+}
diff --git a/src/events/mod.rs b/src/events/mod.rs
index 711ae64..86bc5e9 100644
--- a/src/events/mod.rs
+++ b/src/events/mod.rs
@@ -1,5 +1,6 @@
pub mod app;
pub mod broadcaster;
+pub mod expire;
mod extract;
pub mod repo;
mod routes;
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index f6fce0e..32419d5 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -6,7 +6,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::{self, Login},
- message,
+ message::{self, Message},
},
};
@@ -30,7 +30,7 @@ impl<'c> Events<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(&channel.id).await?;
+ let sequence = self.assign_sequence(channel).await?;
let id = message::Id::generate();
@@ -59,7 +59,7 @@ impl<'c> Events<'c> {
channel: channel.clone(),
data: types::MessageEvent {
sender: sender.clone(),
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> {
+ async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -80,7 +80,7 @@ impl<'c> Events<'c> {
where id = $1
returning last_sequence as "next_sequence: Sequence"
"#,
- channel,
+ channel.id,
)
.fetch_one(&mut *self.0)
.await?;
@@ -88,18 +88,68 @@ impl<'c> Events<'c> {
Ok(next)
}
- pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
- sqlx::query!(
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ message: &message::Id,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let sequence = self.assign_sequence(channel).await?;
+
+ sqlx::query_scalar!(
r#"
delete from message
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ message,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ channel: channel.clone(),
+ data: types::MessageDeletedEvent {
+ message: message.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub async fn expired(
+ &mut self,
+ expire_at: &DateTime,
+ ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ channel.created_at as "channel_created_at: DateTime",
+ message.id as "message: message::Id"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
where sent_at < $1
"#,
expire_at,
)
- .execute(&mut *self.0)
+ .map(|row| {
+ (
+ Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ created_at: row.channel_created_at,
+ },
+ row.message,
+ )
+ })
+ .fetch_all(&mut *self.0)
.await?;
- Ok(())
+ Ok(messages)
}
pub async fn replay(
@@ -134,7 +184,7 @@ impl<'c> Events<'c> {
id: row.sender_id,
name: row.sender_name,
},
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
diff --git a/src/events/routes.rs b/src/events/routes.rs
index 3f70dcd..89c942c 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -13,7 +13,7 @@ use super::{
extract::LastEventId,
types::{self, ResumePoint},
};
-use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login};
+use crate::{app::App, error::Internal, repo::login::Login};
#[cfg(test)]
mod test;
@@ -24,7 +24,6 @@ pub fn router() -> Router<App> {
async fn events(
State(app): State<App>,
- RequestedAt(subscribed_at): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId<ResumePoint>>,
) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> {
@@ -32,7 +31,7 @@ async fn events(
.map(LastEventId::into_inner)
.unwrap_or_default();
- let stream = app.events().subscribe(&subscribed_at, resume_at).await?;
+ let stream = app.events().subscribe(resume_at).await?;
Ok(Events(stream))
}
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 55ada95..a6e2275 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -21,14 +21,14 @@ async fn includes_historical_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -47,11 +47,9 @@ async fn includes_live_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -59,6 +57,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -92,14 +91,14 @@ async fn includes_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let events = events
+ .filter(fixtures::filter::messages())
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -129,8 +128,7 @@ async fn sequential_messages() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -169,17 +167,19 @@ async fn resumes_from() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
let resume_at = {
// First subscription
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let types::ResumableEvent(last_event_id, event) =
- events.next().immediately().await.expect("delivered events");
+ let types::ResumableEvent(last_event_id, event) = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
assert_eq!(initial_message, event);
@@ -187,11 +187,9 @@ async fn resumes_from() {
};
// Resume after disconnect
- let reconnect_at = fixtures::now();
- let routes::Events(resumed) =
- routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -243,13 +241,12 @@ async fn serial_resume() {
];
// First subscription
- let subscribed_at = fixtures::now();
- let routes::Events(events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -277,10 +274,8 @@ async fn serial_resume() {
];
// Second subscription
- let resubscribed_at = fixtures::now();
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -288,6 +283,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -314,11 +310,9 @@ async fn serial_resume() {
fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
];
- // Second subscription
- let resubscribed_at = fixtures::now();
+ // Third subscription
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -326,6 +320,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -340,33 +335,3 @@ async fn serial_resume() {
}
};
}
-
-#[tokio::test]
-async fn removes_expired_messages() {
- // Set up the environment
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
-
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let types::ResumableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered messages");
-
- assert_eq!(message, event);
-}
diff --git a/src/events/types.rs b/src/events/types.rs
index 944321a..9a65207 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -119,6 +119,7 @@ impl ResumeElement for ChannelEvent {
pub enum ChannelEventData {
Created,
Message(MessageEvent),
+ MessageDeleted(MessageDeletedEvent),
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
@@ -132,3 +133,14 @@ impl From<MessageEvent> for ChannelEventData {
Self::Message(message)
}
}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct MessageDeletedEvent {
+ pub message: message::Id,
+}
+
+impl From<MessageDeletedEvent> for ChannelEventData {
+ fn from(message: MessageDeletedEvent) -> Self {
+ Self::MessageDeleted(message)
+ }
+}