summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-18 23:42:08 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-18 23:42:08 -0400
commit82338ddcb7f14ffbd584a954689f02b6e6a7988e (patch)
tree1e0a525766ca45067bb122cad3af69437db504ca /src
parentbde5aea211e9838b4511a2b57c6a256fe89b66ab (diff)
parent17b62b3458e3a992b93cd485b05d3fb112dd349a (diff)
Merge branch 'wip/retain-deleted'
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs24
-rw-r--r--src/channel/history.rs5
-rw-r--r--src/channel/repo.rs179
-rw-r--r--src/channel/routes/channel/delete.rs2
-rw-r--r--src/channel/routes/channel/test.rs6
-rw-r--r--src/channel/routes/test.rs106
-rw-r--r--src/channel/snapshot.rs3
-rw-r--r--src/event/repo.rs5
-rw-r--r--src/event/routes/test.rs33
-rw-r--r--src/event/sequence.rs11
-rw-r--r--src/expire.rs2
-rw-r--r--src/login/repo.rs15
-rw-r--r--src/message/app.rs24
-rw-r--r--src/message/history.rs5
-rw-r--r--src/message/repo.rs214
-rw-r--r--src/message/routes/message.rs6
-rw-r--r--src/message/snapshot.rs4
-rw-r--r--src/test/fixtures/channel.rs23
-rw-r--r--src/test/fixtures/event.rs7
-rw-r--r--src/test/fixtures/filter.rs11
-rw-r--r--src/test/fixtures/message.rs18
-rw-r--r--src/test/fixtures/mod.rs1
-rw-r--r--src/token/repo/auth.rs5
23 files changed, 493 insertions, 216 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 46eaba8..75c662d 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -43,7 +43,7 @@ impl<'a> Channels<'a> {
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
- Ok(channel.iter().flat_map(History::events).collect())
+ Ok(channel.as_ref().and_then(History::as_snapshot))
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
@@ -54,13 +54,16 @@ impl<'a> Channels<'a> {
.by_id(channel)
.await
.not_found(|| Error::NotFound(channel.clone()))?;
+ channel
+ .as_snapshot()
+ .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel, None).await?;
+ let messages = tx.messages().live(&channel).await?;
for message in messages {
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message.id(), &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.extend(
message
.events()
@@ -70,7 +73,7 @@ impl<'a> Channels<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(channel.id(), &deleted).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
events.extend(
channel
.events()
@@ -115,6 +118,17 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 7 days.
+ let purge_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.channels().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -129,6 +143,8 @@ pub enum CreateError {
pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
}
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 78b3437..4b9fcc7 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -31,6 +31,11 @@ impl History {
.filter(Sequence::up_to(resume_point.into()))
.collect()
}
+
+ // Snapshot of this channel as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Channel> {
+ self.events().collect()
+ }
}
// Event factories
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2f57581..1cd1c80 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -28,7 +28,7 @@ impl<'c> Channels<'c> {
values ($1, $2, $3, $4)
returning
id as "id: Id",
- name,
+ name as "name!", -- known non-null as we just set it
created_at as "created_at: DateTime",
created_sequence as "created_sequence: Sequence"
"#,
@@ -41,11 +41,9 @@ impl<'c> Channels<'c> {
channel: Channel {
id: row.id,
name: row.name,
+ deleted_at: None,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -59,10 +57,14 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_deleted as deleted
+ using (id)
where id = $1
"#,
channel,
@@ -70,13 +72,11 @@ impl<'c> Channels<'c> {
.map(|row| History {
channel: Channel {
id: row.id,
- name: row.name,
+ name: row.name.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -89,11 +89,15 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence <= $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence <= $1, true)
order by channel.name
"#,
resume_at,
@@ -101,13 +105,11 @@ impl<'c> Channels<'c> {
.map(|row| History {
channel: Channel {
id: row.id,
- name: row.name,
+ name: row.name.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -123,24 +125,26 @@ impl<'c> Channels<'c> {
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from channel
- where coalesce(created_sequence > $1, true)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
.map(|row| History {
channel: Channel {
id: row.id,
- name: row.name,
+ name: row.name.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -150,50 +154,109 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Id,
+ channel: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let channel = sqlx::query!(
+ let id = channel.id();
+ sqlx::query_scalar!(
r#"
- delete from channel
+ insert into channel_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a channel is deleted, its name is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at channels via the API, cannot retrieve the
+ // "deleted" channel's information by ignoring the deletion event.
+ //
+ // This also avoids the need for a separate name reservation table to ensure that live channels have unique names, since the `channel` table's name field is unique over non-null values.
+ sqlx::query_scalar!(
+ r#"
+ update channel
+ set name = null
where id = $1
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ returning 1 as "updated: bool"
"#,
- channel,
+ id,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: Some(*deleted),
- })
.fetch_one(&mut *self.0)
.await?;
+ let channel = self.by_id(id).await?;
+
Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let channels = sqlx::query_scalar!(
r#"
+ with has_messages as (
+ select channel
+ from message
+ group by channel
+ )
+ delete from channel_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
select
- channel.id as "id: Id"
+ channel.id as "id: Id",
+ channel.name,
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
- left join message
- where created_at < $1
+ left join channel_deleted as deleted
+ using (id)
+ left join message
+ where channel.created_at < $1
and message.id is null
+ and deleted.id is null
"#,
expired_at,
)
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
index efac0c0..5f40ddf 100644
--- a/src/channel/routes/channel/delete.rs
+++ b/src/channel/routes/channel/delete.rs
@@ -32,7 +32,7 @@ impl IntoResponse for Error {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
- app::Error::NotFound(_) => NotFound(error).into_response(),
+ app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(),
other => Internal::from(other).into_response(),
}
}
diff --git a/src/channel/routes/channel/test.rs b/src/channel/routes/channel/test.rs
index bc02b20..c6541cd 100644
--- a/src/channel/routes/channel/test.rs
+++ b/src/channel/routes/channel/test.rs
@@ -4,7 +4,7 @@ use futures::stream::StreamExt;
use super::post;
use crate::{
channel,
- event::{self, Sequenced},
+ event::Sequenced,
message::{self, app::SendError},
test::fixtures::{self, future::Immediately as _},
};
@@ -45,7 +45,7 @@ async fn messages_in_order() {
.subscribe(None)
.await
.expect("subscribing to a valid channel")
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
@@ -54,7 +54,7 @@ async fn messages_in_order() {
assert_eq!(*sent_at, event.at());
assert!(matches!(
event,
- event::Event::Message(message::Event::Sent(event))
+ message::Event::Sent(event)
if event.message.sender == sender.id
&& event.message.body == message
));
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 81f1465..ffd8484 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -1,10 +1,11 @@
+use std::future;
+
use axum::extract::{Json, State};
use futures::stream::StreamExt as _;
use super::post;
use crate::{
- channel::{self, app},
- event,
+ channel::app,
test::fixtures::{self, future::Immediately as _},
};
@@ -19,29 +20,35 @@ async fn new_channel() {
let name = fixtures::channel::propose();
let request = post::Request { name: name.clone() };
- let Json(response_channel) =
- post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("new channel in an empty app");
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a channel in an empty app succeeds");
// Verify the structure of the response
- assert_eq!(name, response_channel.name);
+ assert_eq!(name, response.name);
// Verify the semantics
let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot
- .channels
- .iter()
- .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id));
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
let mut events = app
.events()
.subscribe(None)
.await
.expect("subscribing never fails")
- .filter(fixtures::filter::created());
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
let event = events
.next()
@@ -49,11 +56,7 @@ async fn new_channel() {
.await
.expect("creation event published");
- assert!(matches!(
- event,
- event::Event::Channel(channel::Event::Created(event))
- if event.channel == response_channel
- ));
+ assert_eq!(event.channel, response);
}
#[tokio::test]
@@ -81,3 +84,72 @@ async fn duplicate_name() {
app::CreateError::DuplicateName(name) if channel.name == name
));
}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::login::create(&app, &fixtures::now()).await;
+ let name = fixtures::channel::propose();
+
+ // Call the endpoint (first time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new channel in an empty app");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created channel succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = post::Request { name: name.clone() };
+ let Json(response) = post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("new channel in an empty app");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+
+ let mut events = app
+ .events()
+ .subscribe(None)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("creation event published");
+
+ assert_eq!(event.channel, response);
+}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index d4d1d27..2b7d89a 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -2,11 +2,14 @@ use super::{
event::{Created, Event},
Id,
};
+use crate::clock::DateTime;
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
pub name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
}
impl Channel {
diff --git a/src/event/repo.rs b/src/event/repo.rs
index 40d6a53..56beeea 100644
--- a/src/event/repo.rs
+++ b/src/event/repo.rs
@@ -29,10 +29,7 @@ impl<'c> Sequences<'c> {
.fetch_one(&mut *self.0)
.await?;
- Ok(Instant {
- at: *at,
- sequence: next,
- })
+ Ok(Instant::new(*at, next))
}
pub async fn current(&mut self) -> Result<Sequence, sqlx::Error> {
diff --git a/src/event/routes/test.rs b/src/event/routes/test.rs
index 249f5c2..e6a8b9d 100644
--- a/src/event/routes/test.rs
+++ b/src/event/routes/test.rs
@@ -31,7 +31,7 @@ async fn includes_historical_message() {
// Verify the structure of the response.
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -62,7 +62,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -104,7 +104,7 @@ async fn includes_multiple_channels() {
// Verify the structure of the response.
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -141,13 +141,15 @@ async fn sequential_messages() {
// Verify the structure of the response.
- let mut events = events.filter(|event| {
- future::ready(
- messages
- .iter()
- .any(|message| fixtures::event::message_sent(event, message)),
- )
- });
+ let mut events = events
+ .filter_map(fixtures::message::events)
+ .filter(|event| {
+ future::ready(
+ messages
+ .iter()
+ .any(|message| fixtures::event::message_sent(event, message)),
+ )
+ });
// Verify delivery in order
for message in &messages {
@@ -193,7 +195,7 @@ async fn resumes_from() {
.expect("subscribe never fails");
let event = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.next()
.immediately()
.await
@@ -217,6 +219,7 @@ async fn resumes_from() {
// Verify the structure of the response.
let events = resumed
+ .filter_map(fixtures::message::events)
.take(later_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -275,7 +278,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -313,7 +316,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -351,7 +354,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
- .filter(fixtures::filter::messages())
+ .filter_map(fixtures::message::events)
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -401,6 +404,7 @@ async fn terminates_on_token_expiry() {
];
assert!(events
+ .filter_map(fixtures::message::events)
.filter(|event| future::ready(
messages
.iter()
@@ -452,6 +456,7 @@ async fn terminates_on_logout() {
];
assert!(events
+ .filter_map(fixtures::message::events)
.filter(|event| future::ready(
messages
.iter()
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index bf6d5b8..9bc399b 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -10,6 +10,17 @@ pub struct Instant {
pub sequence: Sequence,
}
+impl Instant {
+ pub fn new(at: DateTime, sequence: Sequence) -> Self {
+ Self { at, sequence }
+ }
+
+ pub fn optional(at: Option<DateTime>, sequence: Option<Sequence>) -> Option<Self> {
+ at.zip(sequence)
+ .map(|(at, sequence)| Self::new(at, sequence))
+ }
+}
+
impl From<Instant> for Sequence {
fn from(instant: Instant) -> Self {
instant.sequence
diff --git a/src/expire.rs b/src/expire.rs
index eaedc44..1427a8d 100644
--- a/src/expire.rs
+++ b/src/expire.rs
@@ -16,6 +16,8 @@ pub async fn middleware(
app.tokens().expire(&expired_at).await?;
app.invites().expire(&expired_at).await?;
app.messages().expire(&expired_at).await?;
+ app.messages().purge(&expired_at).await?;
app.channels().expire(&expired_at).await?;
+ app.channels().purge(&expired_at).await?;
Ok(next.run(req).await)
}
diff --git a/src/login/repo.rs b/src/login/repo.rs
index 6d6510c..7d0fcb1 100644
--- a/src/login/repo.rs
+++ b/src/login/repo.rs
@@ -49,10 +49,7 @@ impl<'c> Logins<'c> {
id: row.id,
name: row.name,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -79,10 +76,7 @@ impl<'c> Logins<'c> {
id: row.id,
name: row.name,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -107,10 +101,7 @@ impl<'c> Logins<'c> {
id: row.id,
name: row.name,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
})
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/message/app.rs b/src/message/app.rs
index c1bcde6..4e50513 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -46,8 +46,17 @@ impl<'a> Messages<'a> {
pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
let mut tx = self.db.begin().await?;
+ let message = tx
+ .messages()
+ .by_id(message)
+ .await
+ .not_found(|| DeleteError::NotFound(message.clone()))?;
+ message
+ .as_snapshot()
+ .ok_or_else(|| DeleteError::Deleted(message.id().clone()))?;
+
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message, &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
tx.commit().await?;
self.events.broadcast(
@@ -91,6 +100,17 @@ impl<'a> Messages<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 6 hours.
+ let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
+
+ let mut tx = self.db.begin().await?;
+ tx.messages().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -107,6 +127,8 @@ pub enum DeleteError {
ChannelNotFound(channel::Id),
#[error("message {0} not found")]
NotFound(Id),
+ #[error("message {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
}
diff --git a/src/message/history.rs b/src/message/history.rs
index 09e69b7..0424d0d 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -30,6 +30,11 @@ impl History {
.filter(Sequence::up_to(resume_point.into()))
.collect()
}
+
+ // Snapshot of this message as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Message> {
+ self.events().collect()
+ }
}
// Events interface
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 71c6d10..85a69fc 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -53,14 +53,12 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
- body: row.body,
+ body: row.body.unwrap_or_default(),
+ deleted_at: None,
},
deleted: None,
})
@@ -70,41 +68,37 @@ impl<'c> Messages<'c> {
Ok(message)
}
- pub async fn in_channel(
- &mut self,
- channel: &channel::History,
- resume_at: ResumePoint,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> {
let channel_id = channel.id();
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
- where channel = $1
- and coalesce(sent_sequence <= $2, true)
- order by sent_sequence
+ left join message_deleted as deleted
+ using (id)
+ where message.channel = $1
+ and deleted.id is null
"#,
channel_id,
- resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
- body: row.body,
+ body: row.body.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -116,30 +110,32 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
- where coalesce(sent_sequence <= $2, true)
- order by sent_sequence
+ left join message_deleted as deleted
+ using (id)
+ where coalesce(message.sent_sequence <= $2, true)
+ order by message.sent_sequence
"#,
resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
- body: row.body,
+ body: row.body.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
@@ -147,33 +143,35 @@ impl<'c> Messages<'c> {
Ok(messages)
}
- async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> {
+ pub async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> {
let message = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.body,
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from message
+ left join message_deleted as deleted
+ using (id)
where id = $1
"#,
message,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
- body: row.body,
+ body: row.body.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_one(&mut *self.0)
.await?;
@@ -183,39 +181,103 @@ impl<'c> Messages<'c> {
pub async fn delete(
&mut self,
- message: &Id,
+ message: &History,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let history = self.by_id(message).await?;
+ let id = message.id();
sqlx::query_scalar!(
r#"
- delete from message
- where
- id = $1
- returning 1 as "deleted: i64"
+ insert into message_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ returning 1 as "deleted: bool"
"#,
- history.message.id,
+ id,
+ deleted.at,
+ deleted.sequence,
)
.fetch_one(&mut *self.0)
.await?;
- Ok(History {
- deleted: Some(*deleted),
- ..history
- })
+ // Small social responsibility hack here: when a message is deleted, its body is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at messages in the channel, cannot retrieve the
+ // "deleted" message by ignoring the deletion event.
+ sqlx::query_scalar!(
+ r#"
+ update message
+ set body = ""
+ where id = $1
+ returning 1 as "blanked: bool"
+ "#,
+ id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ let message = self.by_id(id).await?;
+
+ Ok(message)
}
- pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let messages = sqlx::query_scalar!(
r#"
+ delete from message_deleted
+ where deleted_at < $1
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for message in messages {
+ sqlx::query!(
+ r#"
+ delete from message
+ where id = $1
+ "#,
+ message,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
select
- id as "message: Id"
+ id as "id: Id",
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ message.body,
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from message
- where sent_at < $1
+ left join message_deleted as deleted
+ using (id)
+ where message.sent_at < $1
+ and deleted.id is null
"#,
expire_at,
)
+ .map(|row| History {
+ message: Message {
+ sent: Instant::new(row.sent_at, row.sent_sequence),
+ id: row.id,
+ channel: row.channel,
+ sender: row.sender,
+ body: row.body.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
.fetch_all(&mut *self.0)
.await?;
@@ -226,29 +288,31 @@ impl<'c> Messages<'c> {
let messages = sqlx::query!(
r#"
select
- channel as "channel: channel::Id",
- sender as "sender: login::Id",
id as "id: Id",
- body,
- sent_at as "sent_at: DateTime",
- sent_sequence as "sent_sequence: Sequence"
+ message.channel as "channel: channel::Id",
+ message.sender as "sender: login::Id",
+ message.sent_at as "sent_at: DateTime",
+ message.sent_sequence as "sent_sequence: Sequence",
+ message.body,
+ deleted.deleted_at as "deleted_at: DateTime",
+ deleted.deleted_sequence as "deleted_sequence: Sequence"
from message
+ left join message_deleted as deleted
+ using (id)
where coalesce(message.sent_sequence > $1, true)
"#,
resume_at,
)
.map(|row| History {
message: Message {
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
+ sent: Instant::new(row.sent_at, row.sent_sequence),
channel: row.channel,
sender: row.sender,
id: row.id,
- body: row.body,
+ body: row.body.unwrap_or_default(),
+ deleted_at: row.deleted_at,
},
- deleted: None,
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
})
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/message/routes/message.rs b/src/message/routes/message.rs
index 059b8c1..fbef35a 100644
--- a/src/message/routes/message.rs
+++ b/src/message/routes/message.rs
@@ -33,9 +33,9 @@ pub mod delete {
let Self(error) = self;
#[allow(clippy::match_wildcard_for_single_variants)]
match error {
- DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_) => {
- NotFound(error).into_response()
- }
+ DeleteError::ChannelNotFound(_)
+ | DeleteError::NotFound(_)
+ | DeleteError::Deleted(_) => NotFound(error).into_response(),
other => Internal::from(other).into_response(),
}
}
diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs
index 0eb37bb..7300918 100644
--- a/src/message/snapshot.rs
+++ b/src/message/snapshot.rs
@@ -2,7 +2,7 @@ use super::{
event::{Event, Sent},
Id,
};
-use crate::{channel, event::Instant, login};
+use crate::{channel, clock::DateTime, event::Instant, login};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Message {
@@ -12,6 +12,8 @@ pub struct Message {
pub sender: login::Id,
pub id: Id,
pub body: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
}
impl Message {
diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs
index b678717..a1dda61 100644
--- a/src/test/fixtures/channel.rs
+++ b/src/test/fixtures/channel.rs
@@ -1,10 +1,17 @@
+use std::future;
+
use faker_rand::{
en_us::{addresses::CityName, names::FullName},
faker_impl_from_templates,
};
use rand;
-use crate::{app::App, channel::Channel, clock::RequestedAt};
+use crate::{
+ app::App,
+ channel::{self, Channel},
+ clock::RequestedAt,
+ event::Event,
+};
pub async fn create(app: &App, created_at: &RequestedAt) -> Channel {
let name = propose();
@@ -22,3 +29,17 @@ struct Name(String);
faker_impl_from_templates! {
Name; "{} {}", CityName, FullName;
}
+
+pub fn events(event: Event) -> future::Ready<Option<channel::Event>> {
+ future::ready(match event {
+ Event::Channel(channel) => Some(channel),
+ _ => None,
+ })
+}
+
+pub fn created(event: channel::Event) -> future::Ready<Option<channel::event::Created>> {
+ future::ready(match event {
+ channel::Event::Created(event) => Some(event),
+ channel::Event::Deleted(_) => None,
+ })
+}
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
index 7fe2bf3..fa4fbc0 100644
--- a/src/test/fixtures/event.rs
+++ b/src/test/fixtures/event.rs
@@ -1,11 +1,8 @@
-use crate::{
- event::Event,
- message::{Event::Sent, Message},
-};
+use crate::message::{Event, Message};
pub fn message_sent(event: &Event, message: &Message) -> bool {
matches!(
&event,
- Event::Message(Sent(event)) if message == &event.into()
+ Event::Sent(event) if message == &event.into()
)
}
diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs
deleted file mode 100644
index 84d27b0..0000000
--- a/src/test/fixtures/filter.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-use futures::future;
-
-use crate::{channel::Event::Created, event::Event, message::Event::Sent};
-
-pub fn messages() -> impl FnMut(&Event) -> future::Ready<bool> {
- |event| future::ready(matches!(event, Event::Message(Sent(_))))
-}
-
-pub fn created() -> impl FnMut(&Event) -> future::Ready<bool> {
- |event| future::ready(matches!(event, Event::Channel(Created(_))))
-}
diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs
index 381b10b..eb00e7c 100644
--- a/src/test/fixtures/message.rs
+++ b/src/test/fixtures/message.rs
@@ -1,6 +1,15 @@
+use std::future;
+
use faker_rand::lorem::Paragraphs;
-use crate::{app::App, channel::Channel, clock::RequestedAt, login::Login, message::Message};
+use crate::{
+ app::App,
+ channel::Channel,
+ clock::RequestedAt,
+ event::Event,
+ login::Login,
+ message::{self, Message},
+};
pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &RequestedAt) -> Message {
let body = propose();
@@ -14,3 +23,10 @@ pub async fn send(app: &App, channel: &Channel, login: &Login, sent_at: &Request
pub fn propose() -> String {
rand::random::<Paragraphs>().to_string()
}
+
+pub fn events(event: Event) -> future::Ready<Option<message::Event>> {
+ future::ready(match event {
+ Event::Message(event) => Some(event),
+ _ => None,
+ })
+}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 41f7e13..9658831 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -4,7 +4,6 @@ use crate::{app::App, clock::RequestedAt, db};
pub mod channel;
pub mod event;
-pub mod filter;
pub mod future;
pub mod identity;
pub mod login;
diff --git a/src/token/repo/auth.rs b/src/token/repo/auth.rs
index 9aee81f..88d0878 100644
--- a/src/token/repo/auth.rs
+++ b/src/token/repo/auth.rs
@@ -40,10 +40,7 @@ impl<'t> Auth<'t> {
id: row.id,
name: row.name,
},
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
+ created: Instant::new(row.created_at, row.created_sequence),
},
row.password_hash,
)