diff options
| -rw-r--r-- | .sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json | 12 | ||||
| -rw-r--r-- | src/channel/app.rs | 41 | ||||
| -rw-r--r-- | src/channel/repo/broadcast.rs | 14 | ||||
| -rw-r--r-- | src/events.rs | 4 | ||||
| -rw-r--r-- | src/login/app.rs | 8 | ||||
| -rw-r--r-- | src/repo/login/extract.rs | 2 | ||||
| -rw-r--r-- | src/repo/token.rs | 14 |
7 files changed, 68 insertions, 27 deletions
diff --git a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json new file mode 100644 index 0000000..9edc1af --- /dev/null +++ b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n delete from message\n where sent_at < $1\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d" +} diff --git a/src/channel/app.rs b/src/channel/app.rs index 5aabe31..2f37878 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,6 +1,7 @@ use std::collections::{hash_map::Entry, HashMap}; use std::sync::{Arc, Mutex, MutexGuard}; +use chrono::TimeDelta; use futures::{ future, stream::{self, StreamExt as _}, @@ -73,19 +74,12 @@ impl<'a> Channels<'a> { pub async fn events( &self, channel: &channel::Id, + subscribed_at: &DateTime, resume_at: Option<&DateTime>, ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> { - fn skip_stale( - resume_at: Option<&DateTime>, - ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let resume_at = resume_at.cloned(); - move |msg| { - future::ready(match resume_at { - None => false, - Some(resume_at) => msg.sent_at <= resume_at, - }) - } - } + // Somewhat arbitrarily, expire after 90 days. + let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); + let mut tx = self .db .begin() @@ -96,8 +90,10 @@ impl<'a> Channels<'a> { let live_messages = self .broadcaster .listen(&channel.id) - .skip_while(skip_stale(resume_at)); + .filter(Self::skip_stale(resume_at)) + .filter(Self::skip_expired(&expire_at)); + tx.broadcast().expire(&expire_at).await?; let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; @@ -105,6 +101,24 @@ impl<'a> Channels<'a> { Ok(stored_messages.chain(live_messages)) } + + fn skip_stale( + resume_at: Option<&DateTime>, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let resume_at = resume_at.cloned(); + move |msg| { + future::ready(match resume_at { + None => true, + Some(resume_at) => msg.sent_at > resume_at, + }) + } + } + fn skip_expired( + expire_at: &DateTime, + ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { + let expire_at = expire_at.to_owned(); + move |msg| future::ready(msg.sent_at > expire_at) + } } #[derive(Debug, thiserror::Error)] @@ -200,8 +214,7 @@ impl Broadcaster { // should always hold. // // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>. - debug_assert!(r.is_ok()); - r.unwrap() + r.expect("after filtering, only `Ok` messages should remain") }) } diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs index ff16cd0..182203a 100644 --- a/src/channel/repo/broadcast.rs +++ b/src/channel/repo/broadcast.rs @@ -68,6 +68,20 @@ impl<'c> Broadcast<'c> { Ok(message) } + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + delete from message + where sent_at < $1 + "#, + expire_at, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn replay( &mut self, channel: &Channel, diff --git a/src/events.rs b/src/events.rs index fd73d63..2f5e145 100644 --- a/src/events.rs +++ b/src/events.rs @@ -15,6 +15,7 @@ use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; use crate::{ app::App, channel::{app::EventsError, repo::broadcast}, + clock::RequestedAt, error::InternalError, header::LastEventId, repo::{channel, login::Login}, @@ -32,6 +33,7 @@ struct EventsQuery { async fn on_events( State(app): State<App>, + RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are last_event_id: Option<LastEventId>, Query(query): Query<EventsQuery>, @@ -50,7 +52,7 @@ async fn on_events( async move { let events = app .channels() - .events(&channel, resume_at.as_ref()) + .events(&channel, &now, resume_at.as_ref()) .await? .map(ChannelEvent::wrap(channel)); diff --git a/src/login/app.rs b/src/login/app.rs index 637d852..34ecd52 100644 --- a/src/login/app.rs +++ b/src/login/app.rs @@ -1,3 +1,4 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use super::repo::auth::Provider as _; @@ -47,9 +48,12 @@ impl<'a> Logins<'a> { Ok(token) } - pub async fn validate(&self, secret: &str, used_at: DateTime) -> Result<Login, ValidateError> { + pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result<Login, ValidateError> { + // Somewhat arbitrarily, expire after 7 days. + let expire_at = used_at.to_owned() - TimeDelta::days(7); + let mut tx = self.db.begin().await?; - tx.tokens().expire(used_at).await?; + tx.tokens().expire(&expire_at).await?; let login = tx .tokens() .validate(secret, used_at) diff --git a/src/repo/login/extract.rs b/src/repo/login/extract.rs index a45a1cd..e808f4b 100644 --- a/src/repo/login/extract.rs +++ b/src/repo/login/extract.rs @@ -27,7 +27,7 @@ impl FromRequestParts<App> for Login { let secret = identity_token.secret().ok_or(LoginError::Unauthorized)?; let app = State::<App>::from_request_parts(parts, state).await?; - match app.logins().validate(secret, used_at).await { + match app.logins().validate(secret, &used_at).await { Ok(login) => Ok(login), Err(ValidateError::InvalidToken) => Err(LoginError::Unauthorized), Err(other) => Err(other.into()), diff --git a/src/repo/token.rs b/src/repo/token.rs index 5674c92..a2393e3 100644 --- a/src/repo/token.rs +++ b/src/repo/token.rs @@ -1,4 +1,3 @@ -use chrono::TimeDelta; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use uuid::Uuid; @@ -61,19 +60,16 @@ impl<'c> Tokens<'c> { Ok(()) } - /// Expire and delete all tokens that haven't been used within the expiry - /// interval (right now, 7 days) prior to `expire_at`. Tokens that are in - /// use within that period will be retained. - pub async fn expire(&mut self, expire_at: DateTime) -> Result<(), sqlx::Error> { - // Somewhat arbitrarily, expire after 7 days. - let expired_issue_at = expire_at - TimeDelta::days(7); + /// Expire and delete all tokens that haven't been used more recently than + /// ``expire_at``. + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { sqlx::query!( r#" delete from token where last_used_at < $1 "#, - expired_issue_at, + expire_at, ) .execute(&mut *self.0) .await?; @@ -87,7 +83,7 @@ impl<'c> Tokens<'c> { pub async fn validate( &mut self, secret: &str, - used_at: DateTime, + used_at: &DateTime, ) -> Result<Login, sqlx::Error> { // I would use `update … returning` to do this in one query, but // sqlite3, as of this writing, does not allow an update's `returning` |
