summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs30
-rw-r--r--src/channel/history.rs5
-rw-r--r--src/channel/repo.rs196
-rw-r--r--src/channel/routes/channel/delete.rs2
-rw-r--r--src/channel/routes/channel/test.rs6
-rw-r--r--src/channel/routes/test.rs106
-rw-r--r--src/channel/snapshot.rs3
7 files changed, 266 insertions, 82 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 46eaba8..0409076 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -23,9 +23,9 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
+ let channel = tx.channels().create(name, &created).await?;
+ tx.channels()
+ .reserve_name(&channel, name)
.await
.duplicate(|| CreateError::DuplicateName(name.into()))?;
tx.commit().await?;
@@ -43,7 +43,7 @@ impl<'a> Channels<'a> {
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
- Ok(channel.iter().flat_map(History::events).collect())
+ Ok(channel.as_ref().and_then(History::as_snapshot))
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
@@ -54,13 +54,16 @@ impl<'a> Channels<'a> {
.by_id(channel)
.await
.not_found(|| Error::NotFound(channel.clone()))?;
+ channel
+ .as_snapshot()
+ .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel, None).await?;
+ let messages = tx.messages().live(&channel).await?;
for message in messages {
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message.id(), &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.extend(
message
.events()
@@ -70,7 +73,7 @@ impl<'a> Channels<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(channel.id(), &deleted).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
events.extend(
channel
.events()
@@ -115,6 +118,17 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 7 days.
+ let purge_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.channels().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -129,6 +143,8 @@ pub enum CreateError {
pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
}
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 78b3437..4b9fcc7 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -31,6 +31,11 @@ impl History {
.filter(Sequence::up_to(resume_point.into()))
.collect()
}
+
+ // Snapshot of this channel as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Channel> {
+ self.events().collect()
+ }
}
// Event factories
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2f57581..4b10c54 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -41,11 +41,9 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: None,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -54,15 +52,35 @@ impl<'c> Channels<'c> {
Ok(channel)
}
+ pub async fn reserve_name(&mut self, channel: &History, name: &str) -> Result<(), sqlx::Error> {
+ let channel = channel.id();
+ sqlx::query!(
+ r#"
+ insert into channel_name_reservation (id, name)
+ values ($1, $2)
+ "#,
+ channel,
+ name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
+ }
+
pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
let channel = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
+ left join channel_deleted as deleted
+ using (id)
where id = $1
"#,
channel,
@@ -71,12 +89,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -89,11 +105,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence <= $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence <= $1, true)
order by channel.name
"#,
resume_at,
@@ -102,12 +122,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -123,11 +141,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence > $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
@@ -135,12 +157,10 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -150,50 +170,118 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Id,
+ channel: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let channel = sqlx::query!(
+ let id = channel.id();
+ sqlx::query_scalar!(
r#"
- delete from channel
+ delete from channel_name_reservation
where id = $1
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ returning 1 as "deleted: bool"
"#,
- channel,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ sqlx::query_scalar!(
+ r#"
+ insert into channel_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a channel is deleted, its name is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at channels via the API, cannot retrieve the
+ // "deleted" channel's information by ignoring the deletion event.
+ sqlx::query_scalar!(
+ r#"
+ update channel
+ set name = ""
+ where id = $1
+ returning 1 as "updated: bool"
+ "#,
+ id,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: Some(*deleted),
- })
.fetch_one(&mut *self.0)
.await?;
+ let channel = self.by_id(id).await?;
+
Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let channels = sqlx::query_scalar!(
r#"
+ with has_messages as (
+ select channel
+ from message
+ group by channel
+ )
+ delete from channel_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
select
- channel.id as "id: Id"
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- left join message
- where created_at < $1
+ left join channel_deleted as deleted
+ using (id)
+ left join message
+ where channel.created_at < $1
and message.id is null
+ and deleted.id is null
"#,
expired_at,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
index efac0c0..5f40ddf 100644
--- a/src/channel/routes/channel/delete.rs
+++ b/src/channel/routes/channel/delete.rs
@@ -32,7 +32,7 @@ impl IntoResponse for Error {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
- app::Error::NotFound(_) => NotFound(error).into_response(),
+ app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(),
other => Internal::from(other).into_response(),
}
}
diff --git a/src/channel/routes/channel/test.rs b/src/channel/routes/channel/test.rs
index bc02b20..c6541cd 100644
--- a/src/channel/routes/channel/test.rs
+++ b/src/channel/routes/channel/test.rs
@@ -4,7 +4,7 @@ use futures::stream::StreamExt;
use super::post;
use crate::{
channel,
- event::{self, Sequenced},
+ event::Sequenced,
message::{self, app::SendError},
test::fixtures::{self, future::Immediately as _},
};
@@ -45,7 +45,7 @@ async fn messages_in_order() {
.subscribe(None)
.await
.expect("subscribing to a valid channel")
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
@@ -54,7 +54,7 @@ async fn messages_in_order() {
assert_eq!(*sent_at, event.at());
assert!(matches!(
event,
- event::Event::Message(message::Event::Sent(event))
+ message::Event::Sent(event)
if event.message.sender == sender.id
&& event.message.body == message
));
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 81f1465..ffd8484 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -1,10 +1,11 @@
+use std::future;
+
use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
use super::post;
use crate::{
- channel::{self, app},
- event,
+ channel::app,
test::fixtures::{self, future::Immediately as _},
};
@@ -19,29 +20,35 @@ async fn new_channel() {
let name = fixtures::channel::propose();
let request = post::Request { name: name.clone() };
- let Json(response_channel) =
- post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("new channel in an empty app");
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a channel in an empty app succeeds");
// Verify the structure of the response
- assert_eq!(name, response_channel.name);
+ assert_eq!(name, response.name);
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot
- .channels
- .iter()
- .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id));
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
let mut events = app
.events()
.subscribe(None)
.await
.expect("subscribing never fails")
- .filter(fixtures::filter::created());
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
let event = events
.next()
@@ -49,11 +56,7 @@ async fn new_channel() {
.await
.expect("creation event published");
- assert!(matches!(
- event,
- event::Event::Channel(channel::Event::Created(event))
- if event.channel == response_channel
- ));
+ assert_eq!(event.channel, response);
}
#[tokio::test]
@@ -81,3 +84,72 @@ async fn duplicate_name() {
app::CreateError::DuplicateName(name) if channel.name == name
));
}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::login::create(&app, &fixtures::now()).await;
+ let name = fixtures::channel::propose();
+
+ // Call the endpoint (first time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new channel in an empty app");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created channel succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("new channel in an empty app");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+
+ let mut events = app
+ .events()
+ .subscribe(None)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("creation event published");
+
+ assert_eq!(event.channel, response);
+}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index d4d1d27..2b7d89a 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -2,11 +2,14 @@ use super::{
event::{Created, Event},
Id,
};
+use crate::clock::DateTime;
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
pub name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
}
impl Channel {