summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json12
-rw-r--r--src/channel/app.rs41
-rw-r--r--src/channel/repo/broadcast.rs14
-rw-r--r--src/events.rs4
-rw-r--r--src/login/app.rs8
-rw-r--r--src/repo/login/extract.rs2
-rw-r--r--src/repo/token.rs14
7 files changed, 68 insertions, 27 deletions
diff --git a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json
new file mode 100644
index 0000000..9edc1af
--- /dev/null
+++ b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json
@@ -0,0 +1,12 @@
+{
+ "db_name": "SQLite",
+ "query": "\n delete from message\n where sent_at < $1\n ",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": []
+ },
+ "hash": "61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d"
+}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 5aabe31..2f37878 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,6 +1,7 @@
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{Arc, Mutex, MutexGuard};
+use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
@@ -73,19 +74,12 @@ impl<'a> Channels<'a> {
pub async fn events(
&self,
channel: &channel::Id,
+ subscribed_at: &DateTime,
resume_at: Option<&DateTime>,
) -> Result<impl Stream<Item = broadcast::Message>, EventsError> {
- fn skip_stale(
- resume_at: Option<&DateTime>,
- ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
- let resume_at = resume_at.cloned();
- move |msg| {
- future::ready(match resume_at {
- None => false,
- Some(resume_at) => msg.sent_at <= resume_at,
- })
- }
- }
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
+
let mut tx = self
.db
.begin()
@@ -96,8 +90,10 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
.listen(&channel.id)
- .skip_while(skip_stale(resume_at));
+ .filter(Self::skip_stale(resume_at))
+ .filter(Self::skip_expired(&expire_at));
+ tx.broadcast().expire(&expire_at).await?;
let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
@@ -105,6 +101,24 @@ impl<'a> Channels<'a> {
Ok(stored_messages.chain(live_messages))
}
+
+ fn skip_stale(
+ resume_at: Option<&DateTime>,
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
+ let resume_at = resume_at.cloned();
+ move |msg| {
+ future::ready(match resume_at {
+ None => true,
+ Some(resume_at) => msg.sent_at > resume_at,
+ })
+ }
+ }
+ fn skip_expired(
+ expire_at: &DateTime,
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
+ let expire_at = expire_at.to_owned();
+ move |msg| future::ready(msg.sent_at > expire_at)
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -200,8 +214,7 @@ impl Broadcaster {
// should always hold.
//
// See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>.
- debug_assert!(r.is_ok());
- r.unwrap()
+ r.expect("after filtering, only `Ok` messages should remain")
})
}
diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs
index ff16cd0..182203a 100644
--- a/src/channel/repo/broadcast.rs
+++ b/src/channel/repo/broadcast.rs
@@ -68,6 +68,20 @@ impl<'c> Broadcast<'c> {
Ok(message)
}
+ pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
+ sqlx::query!(
+ r#"
+ delete from message
+ where sent_at < $1
+ "#,
+ expire_at,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
pub async fn replay(
&mut self,
channel: &Channel,
diff --git a/src/events.rs b/src/events.rs
index fd73d63..2f5e145 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -15,6 +15,7 @@ use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
use crate::{
app::App,
channel::{app::EventsError, repo::broadcast},
+ clock::RequestedAt,
error::InternalError,
header::LastEventId,
repo::{channel, login::Login},
@@ -32,6 +33,7 @@ struct EventsQuery {
async fn on_events(
State(app): State<App>,
+ RequestedAt(now): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
@@ -50,7 +52,7 @@ async fn on_events(
async move {
let events = app
.channels()
- .events(&channel, resume_at.as_ref())
+ .events(&channel, &now, resume_at.as_ref())
.await?
.map(ChannelEvent::wrap(channel));
diff --git a/src/login/app.rs b/src/login/app.rs
index 637d852..34ecd52 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -1,3 +1,4 @@
+use chrono::TimeDelta;
use sqlx::sqlite::SqlitePool;
use super::repo::auth::Provider as _;
@@ -47,9 +48,12 @@ impl<'a> Logins<'a> {
Ok(token)
}
- pub async fn validate(&self, secret: &str, used_at: DateTime) -> Result<Login, ValidateError> {
+ pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result<Login, ValidateError> {
+ // Somewhat arbitrarily, expire after 7 days.
+ let expire_at = used_at.to_owned() - TimeDelta::days(7);
+
let mut tx = self.db.begin().await?;
- tx.tokens().expire(used_at).await?;
+ tx.tokens().expire(&expire_at).await?;
let login = tx
.tokens()
.validate(secret, used_at)
diff --git a/src/repo/login/extract.rs b/src/repo/login/extract.rs
index a45a1cd..e808f4b 100644
--- a/src/repo/login/extract.rs
+++ b/src/repo/login/extract.rs
@@ -27,7 +27,7 @@ impl FromRequestParts<App> for Login {
let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?;
let app = State::<App>::from_request_parts(parts, state).await?;
- match app.logins().validate(secret, used_at).await {
+ match app.logins().validate(secret, &used_at).await {
Ok(login) => Ok(login),
Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized),
Err(other) => Err(other.into()),
diff --git a/src/repo/token.rs b/src/repo/token.rs
index 5674c92..a2393e3 100644
--- a/src/repo/token.rs
+++ b/src/repo/token.rs
@@ -1,4 +1,3 @@
-use chrono::TimeDelta;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use uuid::Uuid;
@@ -61,19 +60,16 @@ impl<'c> Tokens<'c> {
Ok(())
}
- /// Expire and delete all tokens that haven't been used within the expiry
- /// interval (right now, 7 days) prior to `expire_at`. Tokens that are in
- /// use within that period will be retained.
- pub async fn expire(&mut self, expire_at: DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, expire after 7 days.
- let expired_issue_at = expire_at - TimeDelta::days(7);
+ /// Expire and delete all tokens that haven't been used more recently than
+ /// ``expire_at``.
+ pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
delete
from token
where last_used_at < $1
"#,
- expired_issue_at,
+ expire_at,
)
.execute(&mut *self.0)
.await?;
@@ -87,7 +83,7 @@ impl<'c> Tokens<'c> {
pub async fn validate(
&mut self,
secret: &str,
- used_at: DateTime,
+ used_at: &DateTime,
) -> Result<Login, sqlx::Error> {
// I would use `update … returning` to do this in one query, but
// sqlite3, as of this writing, does not allow an update's `returning`