summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs89
-rw-r--r--src/channel/history.rs5
-rw-r--r--src/channel/repo.rs349
-rw-r--r--src/channel/routes.rs121
-rw-r--r--src/channel/routes/channel/delete.rs39
-rw-r--r--src/channel/routes/channel/mod.rs9
-rw-r--r--src/channel/routes/channel/post.rs58
-rw-r--r--src/channel/routes/channel/test/delete.rs154
-rw-r--r--src/channel/routes/channel/test/mod.rs2
-rw-r--r--src/channel/routes/channel/test/post.rs (renamed from src/channel/routes/test/on_send.rs)71
-rw-r--r--src/channel/routes/mod.rs19
-rw-r--r--src/channel/routes/post.rs60
-rw-r--r--src/channel/routes/test.rs221
-rw-r--r--src/channel/routes/test/mod.rs2
-rw-r--r--src/channel/routes/test/on_create.rs88
-rw-r--r--src/channel/snapshot.rs5
16 files changed, 951 insertions, 341 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 5d6cada..7bfa0f7 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,12 +2,16 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{repo::Provider as _, Channel, History, Id};
+use super::{
+ repo::{LoadError, Provider as _},
+ Channel, History, Id,
+};
use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
event::{repo::Provider as _, Broadcaster, Event, Sequence},
message::repo::Provider as _,
+ name::{self, Name},
};
pub struct Channels<'a> {
@@ -20,14 +24,14 @@ impl<'a> Channels<'a> {
Self { db, events }
}
- pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
+ pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
let channel = tx
.channels()
.create(name, &created)
.await
- .duplicate(|| CreateError::DuplicateName(name.into()))?;
+ .duplicate(|| CreateError::DuplicateName(name.clone()))?;
tx.commit().await?;
self.events
@@ -38,12 +42,12 @@ impl<'a> Channels<'a> {
// This function is careless with respect to time, and gets you the channel as
// it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, sqlx::Error> {
+ pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> {
let mut tx = self.db.begin().await?;
let channel = tx.channels().by_id(channel).await.optional()?;
tx.commit().await?;
- Ok(channel.iter().flat_map(History::events).collect())
+ Ok(channel.as_ref().and_then(History::as_snapshot))
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
@@ -54,13 +58,16 @@ impl<'a> Channels<'a> {
.by_id(channel)
.await
.not_found(|| Error::NotFound(channel.clone()))?;
+ channel
+ .as_snapshot()
+ .ok_or_else(|| Error::Deleted(channel.id().clone()))?;
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel, None).await?;
+ let messages = tx.messages().live(&channel).await?;
for message in messages {
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(message.id(), &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.extend(
message
.events()
@@ -70,7 +77,7 @@ impl<'a> Channels<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(channel.id(), &deleted).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
events.extend(
channel
.events()
@@ -85,7 +92,7 @@ impl<'a> Channels<'a> {
Ok(())
}
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -115,26 +122,80 @@ impl<'a> Channels<'a> {
Ok(())
}
+
+ pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, purge after 7 days.
+ let purge_at = relative_to.to_owned() - TimeDelta::days(7);
+
+ let mut tx = self.db.begin().await?;
+ tx.channels().purge(&purge_at).await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
+
+ pub async fn recanonicalize(&self) -> Result<(), sqlx::Error> {
+ let mut tx = self.db.begin().await?;
+ tx.channels().recanonicalize().await?;
+ tx.commit().await?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
pub enum CreateError {
#[error("channel named {0} already exists")]
- DuplicateName(String),
+ DuplicateName(Name),
+ #[error(transparent)]
+ Database(#[from] sqlx::Error),
#[error(transparent)]
- DatabaseError(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for CreateError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
+ #[error("channel {0} deleted")]
+ Deleted(Id),
#[error(transparent)]
- DatabaseError(#[from] sqlx::Error),
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for Error {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
#[derive(Debug, thiserror::Error)]
-pub enum InternalError {
+pub enum ExpireError {
#[error(transparent)]
- DatabaseError(#[from] sqlx::Error),
+ Database(#[from] sqlx::Error),
+ #[error(transparent)]
+ Name(#[from] name::Error),
+}
+
+impl From<LoadError> for ExpireError {
+ fn from(error: LoadError) -> Self {
+ match error {
+ LoadError::Database(error) => error.into(),
+ LoadError::Name(error) => error.into(),
+ }
+ }
}
diff --git a/src/channel/history.rs b/src/channel/history.rs
index 78b3437..4b9fcc7 100644
--- a/src/channel/history.rs
+++ b/src/channel/history.rs
@@ -31,6 +31,11 @@ impl History {
.filter(Sequence::up_to(resume_point.into()))
.collect()
}
+
+ // Snapshot of this channel as of all events recorded in this history.
+ pub fn as_snapshot(&self) -> Option<Channel> {
+ self.events().collect()
+ }
}
// Event factories
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 2f57581..a49db52 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -1,9 +1,12 @@
+use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
channel::{Channel, History, Id},
clock::DateTime,
+ db::NotFound,
event::{Instant, ResumePoint, Sequence},
+ name::{self, Name},
};
pub trait Provider {
@@ -19,130 +22,162 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
impl<'c> Channels<'c> {
- pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
+ pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
let id = Id::generate();
- let channel = sqlx::query!(
+ let name = name.clone();
+ let display_name = name.display();
+ let canonical_name = name.canonical();
+ let created = *created;
+
+ sqlx::query!(
r#"
insert
- into channel (id, name, created_at, created_sequence)
- values ($1, $2, $3, $4)
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ into channel (id, created_at, created_sequence)
+ values ($1, $2, $3)
"#,
id,
- name,
created.at,
created.sequence,
)
- .map(|row| History {
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into channel_name (id, display_name, canonical_name)
+ values ($1, $2, $3)
+ "#,
+ id,
+ display_name,
+ canonical_name,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ let channel = History {
channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
+ id,
+ name: name.clone(),
+ deleted_at: None,
},
+ created,
deleted: None,
- })
- .fetch_one(&mut *self.0)
- .await?;
+ };
Ok(channel)
}
- pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> {
let channel = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
+ left join channel_name as name
+ using (id)
+ left join channel_deleted as deleted
+ using (id)
where id = $1
"#,
channel,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
.fetch_one(&mut *self.0)
- .await?;
+ .await??;
Ok(channel)
}
- pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
- where coalesce(created_sequence <= $1, true)
- order by channel.name
+ left join channel_name as name
+ using (id)
+ left join channel_deleted as deleted
+ using (id)
+ where channel.created_sequence <= $1
+ order by name.canonical_name
"#,
resume_at,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
- .fetch_all(&mut *self.0)
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
}
- pub async fn replay(
- &mut self,
- resume_at: Option<Sequence>,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn replay(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, LoadError> {
let channels = sqlx::query!(
r#"
select
id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
+ name.display_name as "display_name: String",
+ name.canonical_name as "canonical_name: String",
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
- where coalesce(created_sequence > $1, true)
+ left join channel_name as name
+ using (id)
+ left join channel_deleted as deleted
+ using (id)
+ where coalesce(channel.created_sequence > $1, true)
"#,
resume_at,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: None,
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
})
- .fetch_all(&mut *self.0)
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
@@ -150,53 +185,171 @@ impl<'c> Channels<'c> {
pub async fn delete(
&mut self,
- channel: &Id,
+ channel: &History,
deleted: &Instant,
- ) -> Result<History, sqlx::Error> {
- let channel = sqlx::query!(
+ ) -> Result<History, LoadError> {
+ let id = channel.id();
+ sqlx::query!(
r#"
- delete from channel
+ insert into channel_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ deleted.at,
+ deleted.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a channel is deleted, its name is
+ // retconned to have been the empty string. Someone reading the event stream
+ // afterwards, or looking at channels via the API, cannot retrieve the
+ // "deleted" channel's information by ignoring the deletion event.
+ //
+ // This also avoids the need for a separate name reservation table to ensure
+ // that live channels have unique names, since the `channel` table's name field
+ // is unique over non-null values.
+ sqlx::query!(
+ r#"
+ delete from channel_name
where id = $1
- returning
- id as "id: Id",
- name,
- created_at as "created_at: DateTime",
- created_sequence as "created_sequence: Sequence"
"#,
- channel,
+ id,
)
- .map(|row| History {
- channel: Channel {
- id: row.id,
- name: row.name,
- },
- created: Instant {
- at: row.created_at,
- sequence: row.created_sequence,
- },
- deleted: Some(*deleted),
- })
- .fetch_one(&mut *self.0)
+ .execute(&mut *self.0)
.await?;
+ let channel = self.by_id(id).await?;
+
Ok(channel)
}
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let channels = sqlx::query_scalar!(
r#"
+ with has_messages as (
+ select channel
+ from message
+ group by channel
+ )
+ delete from channel_deleted
+ where deleted_at < $1
+ and id not in has_messages
+ returning id as "id: Id"
+ "#,
+ purge_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ // Wanted: a way to batch these up into one query.
+ sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
+ let channels = sqlx::query!(
+ r#"
select
- channel.id as "id: Id"
+ channel.id as "id: Id",
+ name.display_name as "display_name?: String",
+ name.canonical_name as "canonical_name?: String",
+ channel.created_at as "created_at: DateTime",
+ channel.created_sequence as "created_sequence: Sequence",
+ deleted.deleted_at as "deleted_at?: DateTime",
+ deleted.deleted_sequence as "deleted_sequence?: Sequence"
from channel
- left join message
- where created_at < $1
+ left join channel_name as name
+ using (id)
+ left join channel_deleted as deleted
+ using (id)
+ left join message
+ on channel.id = message.channel
+ where channel.created_at < $1
and message.id is null
+ and deleted.id is null
"#,
expired_at,
)
- .fetch_all(&mut *self.0)
+ .map(|row| {
+ Ok::<_, name::Error>(History {
+ channel: Channel {
+ id: row.id,
+ name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
+ deleted_at: row.deleted_at,
+ },
+ created: Instant::new(row.created_at, row.created_sequence),
+ deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
+ })
+ })
+ .fetch(&mut *self.0)
+ .map(|res| Ok::<_, LoadError>(res??))
+ .try_collect()
.await?;
Ok(channels)
}
+
+ pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ display_name as "display_name: String"
+ from channel_name
+ "#,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ for channel in channels {
+ let name = Name::from(channel.display_name);
+ let canonical_name = name.canonical();
+
+ sqlx::query!(
+ r#"
+ update channel_name
+ set canonical_name = $1
+ where id = $2
+ "#,
+ canonical_name,
+ channel.id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum LoadError {
+ Database(#[from] sqlx::Error),
+ Name(#[from] name::Error),
+}
+
+impl<T> NotFound for Result<T, LoadError> {
+ type Ok = T;
+ type Error = LoadError;
+
+ fn optional(self) -> Result<Option<T>, LoadError> {
+ match self {
+ Ok(value) => Ok(Some(value)),
+ Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
+ Err(other) => Err(other),
+ }
+ }
}
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
deleted file mode 100644
index eaf7962..0000000
--- a/src/channel/routes.rs
+++ /dev/null
@@ -1,121 +0,0 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
- response::{IntoResponse, Response},
- routing::{delete, post},
- Router,
-};
-
-use super::{app, Channel, Id};
-use crate::{
- app::App,
- clock::RequestedAt,
- error::{Internal, NotFound},
- login::Login,
- message::app::SendError,
-};
-
-#[cfg(test)]
-mod test;
-
-pub fn router() -> Router<App> {
- Router::new()
- .route("/api/channels", post(on_create))
- .route("/api/channels/:channel", post(on_send))
- .route("/api/channels/:channel", delete(on_delete))
-}
-
-#[derive(Clone, serde::Deserialize)]
-struct CreateRequest {
- name: String,
-}
-
-async fn on_create(
- State(app): State<App>,
- _: Login, // requires auth, but doesn't actually care who you are
- RequestedAt(created_at): RequestedAt,
- Json(form): Json<CreateRequest>,
-) -> Result<Json<Channel>, CreateError> {
- let channel = app
- .channels()
- .create(&form.name, &created_at)
- .await
- .map_err(CreateError)?;
-
- Ok(Json(channel))
-}
-
-#[derive(Debug)]
-struct CreateError(app::CreateError);
-
-impl IntoResponse for CreateError {
- fn into_response(self) -> Response {
- let Self(error) = self;
- match error {
- duplicate @ app::CreateError::DuplicateName(_) => {
- (StatusCode::CONFLICT, duplicate.to_string()).into_response()
- }
- other => Internal::from(other).into_response(),
- }
- }
-}
-
-#[derive(Clone, serde::Deserialize)]
-struct SendRequest {
- body: String,
-}
-
-async fn on_send(
- State(app): State<App>,
- Path(channel): Path<Id>,
- RequestedAt(sent_at): RequestedAt,
- login: Login,
- Json(request): Json<SendRequest>,
-) -> Result<StatusCode, SendErrorResponse> {
- app.messages()
- .send(&channel, &login, &sent_at, &request.body)
- .await?;
-
- Ok(StatusCode::ACCEPTED)
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-struct SendErrorResponse(#[from] SendError);
-
-impl IntoResponse for SendErrorResponse {
- fn into_response(self) -> Response {
- let Self(error) = self;
- match error {
- not_found @ SendError::ChannelNotFound(_) => NotFound(not_found).into_response(),
- other => Internal::from(other).into_response(),
- }
- }
-}
-
-async fn on_delete(
- State(app): State<App>,
- Path(channel): Path<Id>,
- RequestedAt(deleted_at): RequestedAt,
- _: Login,
-) -> Result<StatusCode, ErrorResponse> {
- app.channels().delete(&channel, &deleted_at).await?;
-
- Ok(StatusCode::ACCEPTED)
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-struct ErrorResponse(#[from] app::Error);
-
-impl IntoResponse for ErrorResponse {
- fn into_response(self) -> Response {
- let Self(error) = self;
- match error {
- not_found @ app::Error::NotFound(_) => {
- (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
- }
- other => Internal::from(other).into_response(),
- }
- }
-}
diff --git a/src/channel/routes/channel/delete.rs b/src/channel/routes/channel/delete.rs
new file mode 100644
index 0000000..91eb506
--- /dev/null
+++ b/src/channel/routes/channel/delete.rs
@@ -0,0 +1,39 @@
+use axum::{
+ extract::{Path, State},
+ http::StatusCode,
+ response::{IntoResponse, Response},
+};
+
+use crate::{
+ app::App,
+ channel::app,
+ clock::RequestedAt,
+ error::{Internal, NotFound},
+ token::extract::Identity,
+};
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(channel): Path<super::PathInfo>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Identity,
+) -> Result<StatusCode, Error> {
+ app.channels().delete(&channel, &deleted_at).await?;
+
+ Ok(StatusCode::ACCEPTED)
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub app::Error);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> Response {
+ let Self(error) = self;
+ #[allow(clippy::match_wildcard_for_single_variants)]
+ match error {
+ app::Error::NotFound(_) | app::Error::Deleted(_) => NotFound(error).into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/channel/routes/channel/mod.rs b/src/channel/routes/channel/mod.rs
new file mode 100644
index 0000000..31a9142
--- /dev/null
+++ b/src/channel/routes/channel/mod.rs
@@ -0,0 +1,9 @@
+use crate::channel::Id;
+
+pub mod delete;
+pub mod post;
+
+#[cfg(test)]
+mod test;
+
+type PathInfo = Id;
diff --git a/src/channel/routes/channel/post.rs b/src/channel/routes/channel/post.rs
new file mode 100644
index 0000000..b51e691
--- /dev/null
+++ b/src/channel/routes/channel/post.rs
@@ -0,0 +1,58 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::{Internal, NotFound},
+ message::{app::SendError, Body, Message},
+ token::extract::Identity,
+};
+
+pub async fn handler(
+ State(app): State<App>,
+ Path(channel): Path<super::PathInfo>,
+ RequestedAt(sent_at): RequestedAt,
+ identity: Identity,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let message = app
+ .messages()
+ .send(&channel, &identity.login, &sent_at, &request.body)
+ .await?;
+
+ Ok(Response(message))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub body: Body,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Message);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(message) = self;
+ (StatusCode::ACCEPTED, Json(message)).into_response()
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub struct Error(#[from] pub SendError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ #[allow(clippy::match_wildcard_for_single_variants)]
+ match error {
+ SendError::ChannelNotFound(_) => NotFound(error).into_response(),
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/channel/routes/channel/test/delete.rs b/src/channel/routes/channel/test/delete.rs
new file mode 100644
index 0000000..e9af12f
--- /dev/null
+++ b/src/channel/routes/channel/test/delete.rs
@@ -0,0 +1,154 @@
+use axum::{
+ extract::{Path, State},
+ http::StatusCode,
+};
+
+use crate::{
+ channel::{app, routes::channel::delete},
+ test::fixtures,
+};
+
+#[tokio::test]
+pub async fn delete_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let response = delete::handler(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ // Verify the response
+
+ assert_eq!(response, StatusCode::ACCEPTED);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(!snapshot.channels.contains(&channel));
+}
+
+#[tokio::test]
+pub async fn delete_invalid_channel_id() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let channel = fixtures::channel::fictitious();
+ let delete::Error(error) = delete::handler(
+ State(app.clone()),
+ Path(channel.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a nonexistent channel fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::Error::NotFound(id) if id == channel));
+}
+
+#[tokio::test]
+pub async fn delete_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a recently-sent channel succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let delete::Error(error) = delete::handler(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a deleted channel fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::Error::Deleted(id) if id == channel.id));
+}
+
+#[tokio::test]
+pub async fn delete_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring channels always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let delete::Error(error) = delete::handler(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting an expired channel fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::Error::Deleted(id) if id == channel.id));
+}
+
+#[tokio::test]
+pub async fn delete_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ app.channels()
+ .expire(&fixtures::old())
+ .await
+ .expect("expiring channels always succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
+ .await
+ .expect("purging channels always succeeds");
+
+ // Send the request
+
+ let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
+ let delete::Error(error) = delete::handler(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ fixtures::now(),
+ deleter,
+ )
+ .await
+ .expect_err("deleting a purged channel fails");
+
+ // Verify the response
+
+ assert!(matches!(error, app::Error::NotFound(id) if id == channel.id));
+}
diff --git a/src/channel/routes/channel/test/mod.rs b/src/channel/routes/channel/test/mod.rs
new file mode 100644
index 0000000..78bf86e
--- /dev/null
+++ b/src/channel/routes/channel/test/mod.rs
@@ -0,0 +1,2 @@
+mod delete;
+mod post;
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/channel/test/post.rs
index 293cc56..67e7d36 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -2,9 +2,8 @@ use axum::extract::{Json, Path, State};
use futures::stream::StreamExt;
use crate::{
- channel,
- channel::routes,
- event::{self, Sequenced},
+ channel::{self, routes::channel::post},
+ event::Sequenced,
message::{self, app::SendError},
test::fixtures::{self, future::Immediately as _},
};
@@ -14,7 +13,7 @@ async fn messages_in_order() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint (twice)
@@ -25,17 +24,17 @@ async fn messages_in_order() {
];
for (sent_at, body) in &requests {
- let request = routes::SendRequest { body: body.clone() };
+ let request = post::Request { body: body.clone() };
- routes::on_send(
+ let _ = post::handler(
State(app.clone()),
Path(channel.id.clone()),
sent_at.clone(),
sender.clone(),
- Json(request.clone()),
+ Json(request),
)
.await
- .expect("sending to a valid channel");
+ .expect("sending to a valid channel succeeds");
}
// Verify the semantics
@@ -44,8 +43,8 @@ async fn messages_in_order() {
.events()
.subscribe(None)
.await
- .expect("subscribing to a valid channel")
- .filter(fixtures::filter::messages())
+ .expect("subscribing to a valid channel succeeds")
+ .filter_map(fixtures::message::events)
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
@@ -54,8 +53,8 @@ async fn messages_in_order() {
assert_eq!(*sent_at, event.at());
assert!(matches!(
event,
- event::Event::Message(message::Event::Sent(event))
- if event.message.sender == sender.id
+ message::Event::Sent(event)
+ if event.message.sender == sender.login.id
&& event.message.body == message
));
}
@@ -66,24 +65,62 @@ async fn nonexistent_channel() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let login = fixtures::login::create(&app, &fixtures::now()).await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
// Call the endpoint
let sent_at = fixtures::now();
let channel = channel::Id::generate();
- let request = routes::SendRequest {
+ let request = post::Request {
body: fixtures::message::propose(),
};
- let routes::SendErrorResponse(error) = routes::on_send(
+ let post::Error(error) = post::handler(
State(app),
Path(channel.clone()),
sent_at,
- login,
+ sender,
Json(request),
)
.await
- .expect_err("sending to a nonexistent channel");
+ .expect_err("sending to a nonexistent channel fails");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ SendError::ChannelNotFound(error_channel) if channel == error_channel
+ ));
+}
+
+#[tokio::test]
+async fn deleted_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::identity::create(&app, &fixtures::now()).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ app.channels()
+ .delete(&channel.id, &fixtures::now())
+ .await
+ .expect("deleting a new channel succeeds");
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let channel = channel::Id::generate();
+ let request = post::Request {
+ body: fixtures::message::propose(),
+ };
+ let post::Error(error) = post::handler(
+ State(app),
+ Path(channel.clone()),
+ sent_at,
+ sender,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a deleted channel fails");
// Verify the structure of the response
diff --git a/src/channel/routes/mod.rs b/src/channel/routes/mod.rs
new file mode 100644
index 0000000..696bd72
--- /dev/null
+++ b/src/channel/routes/mod.rs
@@ -0,0 +1,19 @@
+use axum::{
+ routing::{delete, post},
+ Router,
+};
+
+use crate::app::App;
+
+mod channel;
+mod post;
+
+#[cfg(test)]
+mod test;
+
+pub fn router() -> Router<App> {
+ Router::new()
+ .route("/api/channels", post(post::handler))
+ .route("/api/channels/:channel", post(channel::post::handler))
+ .route("/api/channels/:channel", delete(channel::delete::handler))
+}
diff --git a/src/channel/routes/post.rs b/src/channel/routes/post.rs
new file mode 100644
index 0000000..810445c
--- /dev/null
+++ b/src/channel/routes/post.rs
@@ -0,0 +1,60 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+ response::{self, IntoResponse},
+};
+
+use crate::{
+ app::App,
+ channel::{app, Channel},
+ clock::RequestedAt,
+ error::Internal,
+ name::Name,
+ token::extract::Identity,
+};
+
+pub async fn handler(
+ State(app): State<App>,
+ _: Identity, // requires auth, but doesn't actually care who you are
+ RequestedAt(created_at): RequestedAt,
+ Json(request): Json<Request>,
+) -> Result<Response, Error> {
+ let channel = app
+ .channels()
+ .create(&request.name, &created_at)
+ .await
+ .map_err(Error)?;
+
+ Ok(Response(channel))
+}
+
+#[derive(serde::Deserialize)]
+pub struct Request {
+ pub name: Name,
+}
+
+#[derive(Debug)]
+pub struct Response(pub Channel);
+
+impl IntoResponse for Response {
+ fn into_response(self) -> response::Response {
+ let Self(channel) = self;
+ (StatusCode::ACCEPTED, Json(channel)).into_response()
+ }
+}
+
+#[derive(Debug)]
+pub struct Error(pub app::CreateError);
+
+impl IntoResponse for Error {
+ fn into_response(self) -> response::Response {
+ let Self(error) = self;
+ #[allow(clippy::match_wildcard_for_single_variants)]
+ match error {
+ app::CreateError::DuplicateName(_) => {
+ (StatusCode::CONFLICT, error.to_string()).into_response()
+ }
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
new file mode 100644
index 0000000..216eba1
--- /dev/null
+++ b/src/channel/routes/test.rs
@@ -0,0 +1,221 @@
+use std::future;
+
+use axum::extract::{Json, State};
+use futures::stream::StreamExt as _;
+
+use super::post;
+use crate::{
+ channel::app,
+ name::Name,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn new_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let name = fixtures::channel::propose();
+ let request = post::Request { name: name.clone() };
+ let post::Response(response) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creating a channel in an empty app succeeds");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
+ assert!(snapshot.channels.iter().any(|channel| channel == &response));
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+
+ let mut events = app
+ .events()
+ .subscribe(None)
+ .await
+ .expect("subscribing never fails")
+ .filter_map(fixtures::channel::events)
+ .filter_map(fixtures::channel::created)
+ .filter(|event| future::ready(event.channel == response));
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("creation event published");
+
+ assert_eq!(event.channel, response);
+}
+
+#[tokio::test]
+async fn duplicate_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let request = post::Request {
+ name: channel.name.clone(),
+ };
+ let post::Error(error) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate channel name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if channel.name == name
+ ));
+}
+
+#[tokio::test]
+async fn conflicting_canonical_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+
+ let existing_name = Name::from("rijksmuseum");
+ app.channels()
+ .create(&existing_name, &fixtures::now())
+ .await
+ .expect("creating a channel in an empty environment succeeds");
+
+ let conflicting_name = Name::from("r\u{0133}ksmuseum");
+
+ // Call the endpoint
+
+ let request = post::Request {
+ name: conflicting_name.clone(),
+ };
+ let post::Error(error) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect_err("duplicate channel name should fail the request");
+
+ // Verify the structure of the response
+
+ assert!(matches!(
+ error,
+ app::CreateError::DuplicateName(name) if conflicting_name == name
+ ));
+}
+
+#[tokio::test]
+async fn name_reusable_after_delete() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::now()).await;
+ let name = fixtures::channel::propose();
+
+ // Call the endpoint (first time)
+
+ let request = post::Request { name: name.clone() };
+ let post::Response(response) = post::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::now(),
+ Json(request),
+ )
+ .await
+ .expect("new channel in an empty app");
+
+ // Delete the channel
+
+ app.channels()
+ .delete(&response.id, &fixtures::now())
+ .await
+ .expect("deleting a newly-created channel succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = post::Request { name: name.clone() };
+ let post::Response(response) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original channel deleted");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+}
+
+#[tokio::test]
+async fn name_reusable_after_expiry() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::identity::create(&app, &fixtures::ancient()).await;
+ let name = fixtures::channel::propose();
+
+ // Call the endpoint (first time)
+
+ let request = post::Request { name: name.clone() };
+ let post::Response(_) = post::handler(
+ State(app.clone()),
+ creator.clone(),
+ fixtures::ancient(),
+ Json(request),
+ )
+ .await
+ .expect("new channel in an empty app");
+
+ // Delete the channel
+
+ app.channels()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiry always succeeds");
+
+ // Call the endpoint (second time)
+
+ let request = post::Request { name: name.clone() };
+ let post::Response(response) =
+ post::handler(State(app.clone()), creator, fixtures::now(), Json(request))
+ .await
+ .expect("creation succeeds after original channel expired");
+
+ // Verify the structure of the response
+
+ assert_eq!(name, response.name);
+
+ // Verify the semantics
+
+ let channel = app
+ .channels()
+ .get(&response.id)
+ .await
+ .expect("searching for channels by ID never fails")
+ .expect("the newly-created channel exists");
+ assert_eq!(response, channel);
+}
diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs
deleted file mode 100644
index 3e5aa17..0000000
--- a/src/channel/routes/test/mod.rs
+++ /dev/null
@@ -1,2 +0,0 @@
-mod on_create;
-mod on_send;
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
deleted file mode 100644
index eeecc7f..0000000
--- a/src/channel/routes/test/on_create.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-use axum::extract::{Json, State};
-use futures::stream::StreamExt as _;
-
-use crate::{
- channel::{self, app, routes},
- event,
- test::fixtures::{self, future::Immediately as _},
-};
-
-#[tokio::test]
-async fn new_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::login::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let name = fixtures::channel::propose();
- let request = routes::CreateRequest { name };
- let Json(response_channel) = routes::on_create(
- State(app.clone()),
- creator,
- fixtures::now(),
- Json(request.clone()),
- )
- .await
- .expect("new channel in an empty app");
-
- // Verify the structure of the response
-
- assert_eq!(request.name, response_channel.name);
-
- // Verify the semantics
-
- let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- assert!(snapshot
- .channels
- .iter()
- .any(|channel| channel.name == response_channel.name && channel.id == response_channel.id));
-
- let mut events = app
- .events()
- .subscribe(None)
- .await
- .expect("subscribing never fails")
- .filter(fixtures::filter::created());
-
- let event = events
- .next()
- .immediately()
- .await
- .expect("creation event published");
-
- assert!(matches!(
- event,
- event::Event::Channel(channel::Event::Created(event))
- if event.channel == response_channel
- ));
-}
-
-#[tokio::test]
-async fn duplicate_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::login::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let request = routes::CreateRequest { name: channel.name };
- let routes::CreateError(error) = routes::on_create(
- State(app.clone()),
- creator,
- fixtures::now(),
- Json(request.clone()),
- )
- .await
- .expect_err("duplicate channel name");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::DuplicateName(name) if request.name == name
- ));
-}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
index d4d1d27..129c0d6 100644
--- a/src/channel/snapshot.rs
+++ b/src/channel/snapshot.rs
@@ -2,11 +2,14 @@ use super::{
event::{Created, Event},
Id,
};
+use crate::{clock::DateTime, name::Name};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
- pub name: String,
+ pub name: Name,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub deleted_at: Option<DateTime>,
}
impl Channel {