diff options
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 30 | ||||
| -rw-r--r-- | src/channel/history.rs | 5 | ||||
| -rw-r--r-- | src/channel/repo.rs | 196 | ||||
| -rw-r--r-- | src/channel/routes/channel/delete.rs | 2 | ||||
| -rw-r--r-- | src/channel/routes/channel/test.rs | 6 | ||||
| -rw-r--r-- | src/channel/routes/test.rs | 106 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 3 |
7 files changed, 266 insertions, 82 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 46eaba8..0409076 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -23,9 +23,9 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let channel = tx - .channels() - .create(name, &created) + let channel = tx.channels().create(name, &created).await?; + tx.channels() + .reserve_name(&channel, name) .await .duplicate(|| CreateError::DuplicateName(name.into()))?; tx.commit().await?; @@ -43,7 +43,7 @@ impl<'a> Channels<'a> { let channel = tx.channels().by_id(channel).await.optional()?; tx.commit().await?; - Ok(channel.iter().flat_map(History::events).collect()) + Ok(channel.as_ref().and_then(History::as_snapshot)) } pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { @@ -54,13 +54,16 @@ impl<'a> Channels<'a> { .by_id(channel) .await .not_found(|| Error::NotFound(channel.clone()))?; + channel + .as_snapshot() + .ok_or_else(|| Error::Deleted(channel.id().clone()))?; let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel, None).await?; + let messages = tx.messages().live(&channel).await?; for message in messages { let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(message.id(), &deleted).await?; + let message = tx.messages().delete(&message, &deleted).await?; events.extend( message .events() @@ -70,7 +73,7 @@ impl<'a> Channels<'a> { } let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(channel.id(), &deleted).await?; + let channel = tx.channels().delete(&channel, &deleted).await?; events.extend( channel .events() @@ -115,6 +118,17 @@ impl<'a> Channels<'a> { Ok(()) } + + pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, purge after 7 days. + let purge_at = relative_to.to_owned() - TimeDelta::days(7); + + let mut tx = self.db.begin().await?; + tx.channels().purge(&purge_at).await?; + tx.commit().await?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] @@ -129,6 +143,8 @@ pub enum CreateError { pub enum Error { #[error("channel {0} not found")] NotFound(Id), + #[error("channel {0} deleted")] + Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), } diff --git a/src/channel/history.rs b/src/channel/history.rs index 78b3437..4b9fcc7 100644 --- a/src/channel/history.rs +++ b/src/channel/history.rs @@ -31,6 +31,11 @@ impl History { .filter(Sequence::up_to(resume_point.into())) .collect() } + + // Snapshot of this channel as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Channel> { + self.events().collect() + } } // Event factories diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 2f57581..4b10c54 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -41,11 +41,9 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: None, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, + created: Instant::new(row.created_at, row.created_sequence), deleted: None, }) .fetch_one(&mut *self.0) @@ -54,15 +52,35 @@ impl<'c> Channels<'c> { Ok(channel) } + pub async fn reserve_name(&mut self, channel: &History, name: &str) -> Result<(), sqlx::Error> { + let channel = channel.id(); + sqlx::query!( + r#" + insert into channel_name_reservation (id, name) + values ($1, $2) + "#, + channel, + name, + ) + .execute(&mut *self.0) + .await?; + + Ok(()) + } + pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { let channel = sqlx::query!( r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel + left join channel_deleted as deleted + using (id) where id = $1 "#, channel, @@ -71,12 +89,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_one(&mut *self.0) .await?; @@ -89,11 +105,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence <= $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence <= $1, true) order by channel.name "#, resume_at, @@ -102,12 +122,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -123,11 +141,15 @@ impl<'c> Channels<'c> { r#" select id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - where coalesce(created_sequence > $1, true) + left join channel_deleted as deleted + using (id) + where coalesce(channel.created_sequence > $1, true) "#, resume_at, ) @@ -135,12 +157,10 @@ impl<'c> Channels<'c> { channel: Channel { id: row.id, name: row.name, + deleted_at: row.deleted_at, }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: None, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), }) .fetch_all(&mut *self.0) .await?; @@ -150,50 +170,118 @@ impl<'c> Channels<'c> { pub async fn delete( &mut self, - channel: &Id, + channel: &History, deleted: &Instant, ) -> Result<History, sqlx::Error> { - let channel = sqlx::query!( + let id = channel.id(); + sqlx::query_scalar!( r#" - delete from channel + delete from channel_name_reservation where id = $1 - returning - id as "id: Id", - name, - created_at as "created_at: DateTime", - created_sequence as "created_sequence: Sequence" + returning 1 as "deleted: bool" "#, - channel, + id, + ) + .fetch_one(&mut *self.0) + .await?; + + sqlx::query_scalar!( + r#" + insert into channel_deleted (id, deleted_at, deleted_sequence) + values ($1, $2, $3) + returning 1 as "deleted: bool" + "#, + id, + deleted.at, + deleted.sequence, + ) + .fetch_one(&mut *self.0) + .await?; + + // Small social responsibility hack here: when a channel is deleted, its name is + // retconned to have been the empty string. Someone reading the event stream + // afterwards, or looking at channels via the API, cannot retrieve the + // "deleted" channel's information by ignoring the deletion event. + sqlx::query_scalar!( + r#" + update channel + set name = "" + where id = $1 + returning 1 as "updated: bool" + "#, + id, ) - .map(|row| History { - channel: Channel { - id: row.id, - name: row.name, - }, - created: Instant { - at: row.created_at, - sequence: row.created_sequence, - }, - deleted: Some(*deleted), - }) .fetch_one(&mut *self.0) .await?; + let channel = self.by_id(id).await?; + Ok(channel) } - pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { let channels = sqlx::query_scalar!( r#" + with has_messages as ( + select channel + from message + group by channel + ) + delete from channel_deleted + where deleted_at < $1 + and id not in has_messages + returning id as "id: Id" + "#, + purge_at, + ) + .fetch_all(&mut *self.0) + .await?; + + for channel in channels { + // Wanted: a way to batch these up into one query. + sqlx::query!( + r#" + delete from channel + where id = $1 + "#, + channel, + ) + .execute(&mut *self.0) + .await?; + } + + Ok(()) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" select - channel.id as "id: Id" + channel.id as "id: Id", + channel.name, + channel.created_at as "created_at: DateTime", + channel.created_sequence as "created_sequence: Sequence", + deleted.deleted_at as "deleted_at: DateTime", + deleted.deleted_sequence as "deleted_sequence: Sequence" from channel - left join message - where created_at < $1 + left join channel_deleted as deleted + using (id) + left join message + where channel.created_at < $1 and message.id is null + and deleted.id is null "#, expired_at, ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + deleted_at: row.deleted_at, + }, + created: Instant::new(row.created_at, row.created_sequence), + deleted: Instant::optional(row.deleted_at, row.deleted_sequence), + }) .fetch_all(&mut *self.0) .await?; diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs index efac0c0..5f40ddf 100644 --- a/src/channel/routes/channel/delete.rs +++ b/src/channel/routes/channel/delete.rs @@ -32,7 +32,7 @@ impl IntoResponse for Error { let Self(error) = self; #[allow(clippy::match_wildcard_for_single_variants)] match error { - app::Error::NotFound(_) => NotFound(error).into_response(), + app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(), other => Internal::from(other).into_response(), } } diff --git a/src/channel/routes/channel/test.rs b/src/channel/routes/channel/test.rs index bc02b20..c6541cd 100644 --- a/src/channel/routes/channel/test.rs +++ b/src/channel/routes/channel/test.rs @@ -4,7 +4,7 @@ use futures::stream::StreamExt; use super::post; use crate::{ channel, - event::{self, Sequenced}, + event::Sequenced, message::{self, app::SendError}, test::fixtures::{self, future::Immediately as _}, }; @@ -45,7 +45,7 @@ async fn messages_in_order() { .subscribe(None) .await .expect("subscribing to a valid channel") - .filter(fixtures::filter::messages()) + .filter_map(fixtures::message::events) .take(requests.len()); let events = events.collect::<Vec<_>>().immediately().await; @@ -54,7 +54,7 @@ async fn messages_in_order() { assert_eq!(*sent_at, event.at()); assert!(matches!( event, - event::Event::Message(message::Event::Sent(event)) + message::Event::Sent(event) if event.message.sender == sender.id && event.message.body == message )); diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs index 81f1465..ffd8484 100644 --- a/src/channel/routes/test.rs +++ b/src/channel/routes/test.rs @@ -1,10 +1,11 @@ +use std::future; + use axum::extract::{Json, State}; use futures::stream::StreamExt as _; use super::post; use crate::{ - channel::{self, app}, - event, + channel::app, test::fixtures::{self, future::Immediately as _}, }; @@ -19,29 +20,35 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = post::Request { name: name.clone() }; - let Json(response_channel) = - post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) - .await - .expect("new channel in an empty app"); + let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("creating a channel in an empty app succeeds"); // Verify the structure of the response - assert_eq!(name, response_channel.name); + assert_eq!(name, response.name); // Verify the semantics let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); - assert!(snapshot - .channels - .iter() - .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id)); + assert!(snapshot.channels.iter().any(|channel| channel == &response)); + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); let mut events = app .events() .subscribe(None) .await .expect("subscribing never fails") - .filter(fixtures::filter::created()); + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == response)); let event = events .next() @@ -49,11 +56,7 @@ async fn new_channel() { .await .expect("creation event published"); - assert!(matches!( - event, - event::Event::Channel(channel::Event::Created(event)) - if event.channel == response_channel - )); + assert_eq!(event.channel, response); } #[tokio::test] @@ -81,3 +84,72 @@ async fn duplicate_name() { app::CreateError::DuplicateName(name) if channel.name == name )); } + +#[tokio::test] +async fn name_reusable_after_delete() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let creator = fixtures::login::create(&app, &fixtures::now()).await; + let name = fixtures::channel::propose(); + + // Call the endpoint (first time) + + let request = post::Request { name: name.clone() }; + let Json(response) = post::handler( + State(app.clone()), + creator.clone(), + fixtures::now(), + Json(request), + ) + .await + .expect("new channel in an empty app"); + + // Delete the channel + + app.channels() + .delete(&response.id, &fixtures::now()) + .await + .expect("deleting a newly-created channel succeeds"); + + // Call the endpoint (second time) + + let request = post::Request { name: name.clone() }; + let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request)) + .await + .expect("new channel in an empty app"); + + // Verify the structure of the response + + assert_eq!(name, response.name); + + // Verify the semantics + + let snapshot = app.boot().snapshot().await.expect("boot always succeeds"); + assert!(snapshot.channels.iter().any(|channel| channel == &response)); + + let channel = app + .channels() + .get(&response.id) + .await + .expect("searching for channels by ID never fails") + .expect("the newly-created channel exists"); + assert_eq!(response, channel); + + let mut events = app + .events() + .subscribe(None) + .await + .expect("subscribing never fails") + .filter_map(fixtures::channel::events) + .filter_map(fixtures::channel::created) + .filter(|event| future::ready(event.channel == response)); + + let event = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(event.channel, response); +} diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs index d4d1d27..2b7d89a 100644 --- a/src/channel/snapshot.rs +++ b/src/channel/snapshot.rs @@ -2,11 +2,14 @@ use super::{ event::{Created, Event}, Id, }; +use crate::clock::DateTime; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Channel { pub id: Id, pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_at: Option<DateTime>, } impl Channel { |
