summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 23:46:55 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-28 01:00:12 -0400
commit155f6f2556b21e6b25afe096b19adcde1255c598 (patch)
treefb20184cd244d2b9138603fbd4909d7968cf0796 /src
parent60b711c844f8624348d5d1dac3a625532a8e2a82 (diff)
Expire channels, too.
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs29
-rw-r--r--src/channel/routes/test/on_create.rs10
-rw-r--r--src/cli.rs4
-rw-r--r--src/events/app.rs11
-rw-r--r--src/events/mod.rs1
-rw-r--r--src/events/repo/message.rs11
-rw-r--r--src/events/types.rs55
-rw-r--r--src/expire.rs (renamed from src/events/expire.rs)2
-rw-r--r--src/lib.rs1
-rw-r--r--src/login/app.rs15
-rw-r--r--src/login/routes/test/login.rs6
-rw-r--r--src/repo/channel.rs52
-rw-r--r--src/test/fixtures/filter.rs6
13 files changed, 172 insertions, 31 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 1eeca79..d7312e4 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,8 +1,9 @@
+use chrono::TimeDelta;
use sqlx::sqlite::SqlitePool;
use crate::{
clock::DateTime,
- events::{broadcaster::Broadcaster, types::ChannelEvent},
+ events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
repo::channel::{Channel, Provider as _},
};
@@ -38,6 +39,32 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.channels().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for channel in expired {
+ let sequence = tx.message_events().assign_sequence(&channel).await?;
+ let event = tx
+ .channels()
+ .delete_expired(&channel, sequence, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 5e62d7f..e2610a5 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -1,5 +1,5 @@
use axum::extract::{Json, State};
-use futures::{future, stream::StreamExt as _};
+use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
@@ -41,7 +41,7 @@ async fn new_channel() {
.subscribe(types::ResumePoint::default())
.await
.expect("subscribing never fails")
- .filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel));
+ .filter(fixtures::filter::created());
let types::ResumableEvent(_, event) = events
.next()
@@ -50,7 +50,11 @@ async fn new_channel() {
.expect("creation event published");
assert_eq!(types::Sequence::default(), event.sequence);
- assert_eq!(types::ChannelEventData::Created, event.data);
+ assert!(matches!(
+ event.data,
+ types::ChannelEventData::Created(event)
+ if event.channel == response_channel
+ ));
}
#[tokio::test]
diff --git a/src/cli.rs b/src/cli.rs
index 472d68f..132baf8 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -10,7 +10,7 @@ use clap::Parser;
use sqlx::sqlite::SqlitePool;
use tokio::net;
-use crate::{app::App, channel, clock, events, login, repo::pool};
+use crate::{app::App, channel, clock, events, expire, login, repo::pool};
/// Command-line entry point for running the `hi` server.
///
@@ -74,7 +74,7 @@ impl Args {
let app = routers()
.route_layer(middleware::from_fn_with_state(
app.clone(),
- events::expire::middleware,
+ expire::middleware,
))
.route_layer(middleware::from_fn(clock::middleware))
.with_state(app);
diff --git a/src/events/app.rs b/src/events/app.rs
index 03f3ee6..5162c67 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -64,9 +64,10 @@ impl<'a> Events<'a> {
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
+ let sequence = tx.message_events().assign_sequence(&channel).await?;
let event = tx
.message_events()
- .delete_expired(&channel, &message, relative_to)
+ .delete_expired(&channel, &message, sequence, relative_to)
.await?;
events.push(event);
}
@@ -134,9 +135,11 @@ impl<'a> Events<'a> {
Ok(created_events.chain(replay).chain(live_messages).scan(
resume_at,
|resume_point, event| {
- let channel = &event.channel.id;
- let sequence = event.sequence;
- resume_point.advance(channel, sequence);
+ let channel = &event.channel_id();
+ match event.data {
+ types::ChannelEventData::Deleted(_) => resume_point.forget(channel),
+ _ => resume_point.advance(channel, event.sequence),
+ }
let event = types::ResumableEvent(resume_point.clone(), event);
diff --git a/src/events/mod.rs b/src/events/mod.rs
index 86bc5e9..711ae64 100644
--- a/src/events/mod.rs
+++ b/src/events/mod.rs
@@ -1,6 +1,5 @@
pub mod app;
pub mod broadcaster;
-pub mod expire;
mod extract;
pub mod repo;
mod routes;
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index 32419d5..f8bae2b 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -56,8 +56,8 @@ impl<'c> Events<'c> {
.map(|row| types::ChannelEvent {
sequence: row.sequence,
at: row.sent_at,
- channel: channel.clone(),
data: types::MessageEvent {
+ channel: channel.clone(),
sender: sender.clone(),
message: Message {
id: row.id,
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
+ pub async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -92,10 +92,9 @@ impl<'c> Events<'c> {
&mut self,
channel: &Channel,
message: &message::Id,
+ sequence: Sequence,
deleted_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(channel).await?;
-
sqlx::query_scalar!(
r#"
delete from message
@@ -110,8 +109,8 @@ impl<'c> Events<'c> {
Ok(types::ChannelEvent {
sequence,
at: *deleted_at,
- channel: channel.clone(),
data: types::MessageDeletedEvent {
+ channel: channel.clone(),
message: message.clone(),
}
.into(),
@@ -178,8 +177,8 @@ impl<'c> Events<'c> {
.map(|row| types::ChannelEvent {
sequence: row.sequence,
at: row.sent_at,
- channel: channel.clone(),
data: types::MessageEvent {
+ channel: channel.clone(),
sender: login::Login {
id: row.sender_id,
name: row.sender_name,
diff --git a/src/events/types.rs b/src/events/types.rs
index 9a65207..966842d 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -56,6 +56,11 @@ impl ResumePoint {
elements.insert(channel.clone(), sequence);
}
+ pub fn forget(&mut self, channel: &channel::Id) {
+ let Self(elements) = self;
+ elements.remove(channel);
+ }
+
pub fn get(&self, channel: &channel::Id) -> Option<Sequence> {
let Self(elements) = self;
elements.get(channel).copied()
@@ -92,7 +97,6 @@ pub struct ChannelEvent {
#[serde(skip)]
pub sequence: Sequence,
pub at: DateTime,
- pub channel: Channel,
#[serde(flatten)]
pub data: ChannelEventData,
}
@@ -102,45 +106,78 @@ impl ChannelEvent {
Self {
at: channel.created_at,
sequence: Sequence::default(),
- channel,
- data: ChannelEventData::Created,
+ data: CreatedEvent { channel }.into(),
+ }
+ }
+
+ pub fn channel_id(&self) -> &channel::Id {
+ match &self.data {
+ ChannelEventData::Created(event) => &event.channel.id,
+ ChannelEventData::Message(event) => &event.channel.id,
+ ChannelEventData::MessageDeleted(event) => &event.channel.id,
+ ChannelEventData::Deleted(event) => &event.channel,
}
}
}
impl ResumeElement for ChannelEvent {
fn element(&self) -> (&channel::Id, Sequence) {
- (&self.channel.id, self.sequence)
+ (self.channel_id(), self.sequence)
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ChannelEventData {
- Created,
+ Created(CreatedEvent),
Message(MessageEvent),
MessageDeleted(MessageDeletedEvent),
+ Deleted(DeletedEvent),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct CreatedEvent {
+ pub channel: Channel,
+}
+
+impl From<CreatedEvent> for ChannelEventData {
+ fn from(event: CreatedEvent) -> Self {
+ Self::Created(event)
+ }
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct MessageEvent {
+ pub channel: Channel,
pub sender: Login,
pub message: message::Message,
}
impl From<MessageEvent> for ChannelEventData {
- fn from(message: MessageEvent) -> Self {
- Self::Message(message)
+ fn from(event: MessageEvent) -> Self {
+ Self::Message(event)
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct MessageDeletedEvent {
+ pub channel: Channel,
pub message: message::Id,
}
impl From<MessageDeletedEvent> for ChannelEventData {
- fn from(message: MessageDeletedEvent) -> Self {
- Self::MessageDeleted(message)
+ fn from(event: MessageDeletedEvent) -> Self {
+ Self::MessageDeleted(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct DeletedEvent {
+ pub channel: channel::Id,
+}
+
+impl From<DeletedEvent> for ChannelEventData {
+ fn from(event: DeletedEvent) -> Self {
+ Self::Deleted(event)
}
}
diff --git a/src/events/expire.rs b/src/expire.rs
index d92142d..16006d1 100644
--- a/src/events/expire.rs
+++ b/src/expire.rs
@@ -13,6 +13,8 @@ pub async fn middleware(
req: Request,
next: Next,
) -> Result<Response, Internal> {
+ app.logins().expire(&expired_at).await?;
app.events().expire(&expired_at).await?;
+ app.channels().expire(&expired_at).await?;
Ok(next.run(req).await)
}
diff --git a/src/lib.rs b/src/lib.rs
index f731e57..4139d4d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -8,6 +8,7 @@ pub mod cli;
mod clock;
mod error;
mod events;
+mod expire;
mod id;
mod login;
mod password;
diff --git a/src/login/app.rs b/src/login/app.rs
index 10609c6..292b95f 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -60,11 +60,7 @@ impl<'a> Logins<'a> {
}
pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result<Login, ValidateError> {
- // Somewhat arbitrarily, expire after 7 days.
- let expire_at = used_at.to_owned() - TimeDelta::days(7);
-
let mut tx = self.db.begin().await?;
- tx.tokens().expire(&expire_at).await?;
let login = tx
.tokens()
.validate(secret, used_at)
@@ -75,6 +71,17 @@ impl<'a> Logins<'a> {
Ok(login)
}
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 7 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.tokens().expire(&expire_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
+
pub async fn logout(&self, secret: &str) -> Result<(), ValidateError> {
let mut tx = self.db.begin().await?;
tx.tokens()
diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs
index d92c01b..719ccca 100644
--- a/src/login/routes/test/login.rs
+++ b/src/login/routes/test/login.rs
@@ -126,6 +126,12 @@ async fn token_expires() {
// Verify the semantics
+ let expired_at = fixtures::now();
+ app.logins()
+ .expire(&expired_at)
+ .await
+ .expect("expiring tokens never fails");
+
let verified_at = fixtures::now();
let error = app
.logins()
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 6514426..3c7468f 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -2,7 +2,11 @@ use std::fmt;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId};
+use crate::{
+ clock::DateTime,
+ events::types::{self, Sequence},
+ id::Id as BaseId,
+};
pub trait Provider {
fn channels(&mut self) -> Channels;
@@ -91,6 +95,52 @@ impl<'c> Channels<'c> {
Ok(channels)
}
+
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ sequence: Sequence,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let channel = channel.id.clone();
+ sqlx::query_scalar!(
+ r#"
+ delete from channel
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ channel,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ data: types::DeletedEvent { channel }.into(),
+ })
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Channel>, sqlx::Error> {
+ let channels = sqlx::query_as!(
+ Channel,
+ r#"
+ select
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime"
+ from channel
+ left join message
+ where created_at < $1
+ and message.id is null
+ "#,
+ expired_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
}
// Stable identifier for a [Channel]. Prefixed with `C`.
diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs
index 8847e13..fbebced 100644
--- a/src/test/fixtures/filter.rs
+++ b/src/test/fixtures/filter.rs
@@ -7,3 +7,9 @@ pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready<bool> {
future::ready(matches!(event.data, types::ChannelEventData::Message(_)))
}
}
+
+pub fn created() -> impl FnMut(&types::ResumableEvent) -> future::Ready<bool> {
+ |types::ResumableEvent(_, event)| {
+ future::ready(matches!(event.data, types::ChannelEventData::Created(_)))
+ }
+}