summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2025-06-30 22:00:57 -0400
committerOwen Jacobson <owen@grimoire.ca>2025-07-03 22:43:42 -0400
commita15e3d580124f561864c6a39f1e035eb1b3aab13 (patch)
treeef80f725e7b02547a23b5c29a482fbf3fd188c0d /src/channel
parent5af4aea1e2f143499529b70f9cf191c6994265c6 (diff)
Rename "channel" to "conversation" within the server.
I've split this from the schema and API changes because, frankly, it's huge. Annoyingly so. There are no semantic changes in this, it's all symbol changes, but there are a _lot_ of them because the term "channel" leaks all over everything in a service whose primary role is managing messages sent to channels (now, conversations). I found a buggy test while working on this! It's not fixed in this commit, because it felt mean to hide a real change in the middle of this much chaff.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs224
-rw-r--r--src/channel/event.rs46
-rw-r--r--src/channel/handlers/create/mod.rs67
-rw-r--r--src/channel/handlers/create/test.rs250
-rw-r--r--src/channel/handlers/delete/mod.rs59
-rw-r--r--src/channel/handlers/delete/test.rs184
-rw-r--r--src/channel/handlers/mod.rs9
-rw-r--r--src/channel/handlers/send/mod.rs63
-rw-r--r--src/channel/handlers/send/test.rs130
-rw-r--r--src/channel/history.rs69
-rw-r--r--src/channel/id.rs38
-rw-r--r--src/channel/mod.rs10
-rw-r--r--src/channel/repo.rs336
-rw-r--r--src/channel/snapshot.rs43
-rw-r--r--src/channel/validate.rs25
15 files changed, 0 insertions, 1553 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
deleted file mode 100644
index e3b169c..0000000
--- a/src/channel/app.rs
+++ /dev/null
@@ -1,224 +0,0 @@
-use chrono::TimeDelta;
-use itertools::Itertools;
-use sqlx::sqlite::SqlitePool;
-
-use super::{
- Channel, Id,
- repo::{LoadError, Provider as _},
- validate,
-};
-use crate::{
- clock::DateTime,
- db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
- name::{self, Name},
-};
-
-pub struct Channels<'a> {
- db: &'a SqlitePool,
- events: &'a Broadcaster,
-}
-
-impl<'a> Channels<'a> {
- pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
- Self { db, events }
- }
-
- pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
- if !validate::name(name) {
- return Err(CreateError::InvalidName(name.clone()));
- }
-
- let mut tx = self.db.begin().await?;
- let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
- .await
- .duplicate(|| CreateError::DuplicateName(name.clone()))?;
- tx.commit().await?;
-
- self.events
- .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
-
- Ok(channel.as_created())
- }
-
- // This function is careless with respect to time, and gets you the channel as
- // it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
- let to_not_found = || Error::NotFound(channel.clone());
- let to_deleted = || Error::Deleted(channel.clone());
-
- let mut tx = self.db.begin().await?;
- let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?;
- tx.commit().await?;
-
- channel.as_snapshot().ok_or_else(to_deleted)
- }
-
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
- let mut tx = self.db.begin().await?;
-
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| DeleteError::NotFound(channel.clone()))?;
- channel
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?;
-
- let mut events = Vec::new();
-
- let messages = tx.messages().live(&channel).await?;
- let has_messages = messages
- .iter()
- .map(message::History::as_snapshot)
- .any(|message| message.is_some());
- if has_messages {
- return Err(DeleteError::NotEmpty(channel.id().clone()));
- }
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.extend(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
-
- tx.commit().await?;
-
- self.events.broadcast(events);
-
- Ok(())
- }
-
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
- // Somewhat arbitrarily, expire after 7 days. Active channels will not be
- // expired until their messages expire.
- let expire_at = relative_to.to_owned() - TimeDelta::days(7);
-
- let mut tx = self.db.begin().await?;
- let expired = tx.channels().expired(&expire_at).await?;
-
- let mut events = Vec::with_capacity(expired.len());
- for channel in expired {
- let deleted = tx.sequence().next(relative_to).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.push(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
- }
-
- tx.commit().await?;
-
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
-
- Ok(())
- }
-
- pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, purge after 6 hours.
- let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
-
- let mut tx = self.db.begin().await?;
- tx.channels().purge(&purge_at).await?;
- tx.commit().await?;
-
- Ok(())
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum CreateError {
- #[error("channel named {0} already exists")]
- DuplicateName(Name),
- #[error("invalid channel name: {0}")]
- InvalidName(Name),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for CreateError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for Error {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error("channel {0} not empty")]
- NotEmpty(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for DeleteError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum ExpireError {
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for ExpireError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
diff --git a/src/channel/event.rs b/src/channel/event.rs
deleted file mode 100644
index a5739f9..0000000
--- a/src/channel/event.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use super::Channel;
-use crate::{
- channel,
- event::{Instant, Sequenced},
-};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "event", rename_all = "snake_case")]
-pub enum Event {
- Created(Created),
- Deleted(Deleted),
-}
-
-impl Sequenced for Event {
- fn instant(&self) -> Instant {
- match self {
- Self::Created(event) => event.channel.created,
- Self::Deleted(event) => event.instant,
- }
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Created {
- #[serde(flatten)]
- pub channel: Channel,
-}
-
-impl From<Created> for Event {
- fn from(event: Created) -> Self {
- Self::Created(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Deleted {
- #[serde(flatten)]
- pub instant: Instant,
- pub id: channel::Id,
-}
-
-impl From<Deleted> for Event {
- fn from(event: Deleted) -> Self {
- Self::Deleted(event)
- }
-}
diff --git a/src/channel/handlers/create/mod.rs b/src/channel/handlers/create/mod.rs
deleted file mode 100644
index 2c860fc..0000000
--- a/src/channel/handlers/create/mod.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-use axum::{
- extract::{Json, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::{
- app::App,
- channel::{Channel, app},
- clock::RequestedAt,
- error::Internal,
- name::Name,
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- _: Identity, // requires auth, but doesn't actually care who you are
- RequestedAt(created_at): RequestedAt,
- Json(request): Json<Request>,
-) -> Result<Response, Error> {
- let channel = app
- .channels()
- .create(&request.name, &created_at)
- .await
- .map_err(Error)?;
-
- Ok(Response(channel))
-}
-
-#[derive(serde::Deserialize)]
-pub struct Request {
- pub name: Name,
-}
-
-#[derive(Debug)]
-pub struct Response(pub Channel);
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- let Self(channel) = self;
- (StatusCode::ACCEPTED, Json(channel)).into_response()
- }
-}
-
-#[derive(Debug)]
-pub struct Error(pub app::CreateError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- app::CreateError::DuplicateName(_) => {
- (StatusCode::CONFLICT, error.to_string()).into_response()
- }
- app::CreateError::InvalidName(_) => {
- (StatusCode::BAD_REQUEST, error.to_string()).into_response()
- }
- app::CreateError::Name(_) | app::CreateError::Database(_) => {
- Internal::from(error).into_response()
- }
- }
- }
-}
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs
deleted file mode 100644
index 31bb778..0000000
--- a/src/channel/handlers/create/test.rs
+++ /dev/null
@@ -1,250 +0,0 @@
-use std::future;
-
-use axum::extract::{Json, State};
-use futures::stream::StreamExt as _;
-use itertools::Itertools;
-
-use crate::{
- channel::app,
- name::Name,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn new_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint
-
- let name = fixtures::channel::propose();
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creating a channel in an empty app succeeds");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- let created = snapshot
- .events
- .into_iter()
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .exactly_one()
- .expect("only one channel has been created");
- assert_eq!(response, created.channel);
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-
- let mut events = app
- .events()
- .subscribe(resume_point)
- .await
- .expect("subscribing never fails")
- .filter_map(fixtures::event::stream::channel)
- .filter_map(fixtures::event::stream::channel::created)
- .filter(|event| future::ready(event.channel == response));
-
- let event = events.next().expect_some("creation event published").await;
-
- assert_eq!(event.channel, response);
-}
-
-#[tokio::test]
-async fn duplicate_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let request = super::Request {
- name: channel.name.clone(),
- };
- let super::Error(error) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect_err("duplicate channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::DuplicateName(name) if channel.name == name
- ));
-}
-
-#[tokio::test]
-async fn conflicting_canonical_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let existing_name = Name::from("rijksmuseum");
- app.channels()
- .create(&existing_name, &fixtures::now())
- .await
- .expect("creating a channel in an empty environment succeeds");
-
- let conflicting_name = Name::from("r\u{0133}ksmuseum");
-
- // Call the endpoint
-
- let request = super::Request {
- name: conflicting_name.clone(),
- };
- let super::Error(error) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect_err("duplicate channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::DuplicateName(name) if conflicting_name == name
- ));
-}
-
-#[tokio::test]
-async fn invalid_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let name = fixtures::channel::propose_invalid_name();
- let request = super::Request { name: name.clone() };
- let super::Error(error) = crate::channel::handlers::create::handler(
- State(app.clone()),
- creator,
- fixtures::now(),
- Json(request),
- )
- .await
- .expect_err("invalid channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::InvalidName(error_name) if name == error_name
- ));
-}
-
-#[tokio::test]
-async fn name_reusable_after_delete() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let name = fixtures::channel::propose();
-
- // Call the endpoint (first time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) = super::handler(
- State(app.clone()),
- creator.clone(),
- fixtures::now(),
- Json(request),
- )
- .await
- .expect("new channel in an empty app");
-
- // Delete the channel
-
- app.channels()
- .delete(&response.id, &fixtures::now())
- .await
- .expect("deleting a newly-created channel succeeds");
-
- // Call the endpoint (second time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creation succeeds after original channel deleted");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-}
-
-#[tokio::test]
-async fn name_reusable_after_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::ancient()).await;
- let name = fixtures::channel::propose();
-
- // Call the endpoint (first time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(_) = super::handler(
- State(app.clone()),
- creator.clone(),
- fixtures::ancient(),
- Json(request),
- )
- .await
- .expect("new channel in an empty app");
-
- // Delete the channel
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiry always succeeds");
-
- // Call the endpoint (second time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creation succeeds after original channel expired");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-}
diff --git a/src/channel/handlers/delete/mod.rs b/src/channel/handlers/delete/mod.rs
deleted file mode 100644
index b986bec..0000000
--- a/src/channel/handlers/delete/mod.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::{
- app::App,
- channel::{self, app, handlers::PathInfo},
- clock::RequestedAt,
- error::{Internal, NotFound},
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- Path(channel): Path<PathInfo>,
- RequestedAt(deleted_at): RequestedAt,
- _: Identity,
-) -> Result<Response, Error> {
- app.channels().delete(&channel, &deleted_at).await?;
-
- Ok(Response { id: channel })
-}
-
-#[derive(Debug, serde::Serialize)]
-pub struct Response {
- pub id: channel::Id,
-}
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- (StatusCode::ACCEPTED, Json(self)).into_response()
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub struct Error(#[from] pub app::DeleteError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => {
- NotFound(error).into_response()
- }
- app::DeleteError::NotEmpty(_) => {
- (StatusCode::CONFLICT, error.to_string()).into_response()
- }
- app::DeleteError::Name(_) | app::DeleteError::Database(_) => {
- Internal::from(error).into_response()
- }
- }
- }
-}
diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs
deleted file mode 100644
index 99c19db..0000000
--- a/src/channel/handlers/delete/test.rs
+++ /dev/null
@@ -1,184 +0,0 @@
-use axum::extract::{Path, State};
-use itertools::Itertools;
-
-use crate::{channel::app, test::fixtures};
-
-#[tokio::test]
-pub async fn valid_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let response = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect("deleting a valid channel succeeds");
-
- // Verify the response
-
- assert_eq!(channel.id, response.id);
-
- // Verify the semantics
-
- let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- let created = snapshot
- .events
- .into_iter()
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .exactly_one()
- .expect("only one channel has been created");
- // We don't expect `channel` to match the event exactly, as the name will have been
- // tombstoned and the channel given a `deleted_at` date.
- assert_eq!(channel.id, created.channel.id);
-}
-
-#[tokio::test]
-pub async fn invalid_channel_id() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::fictitious();
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a nonexistent channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel));
-}
-
-#[tokio::test]
-pub async fn channel_deleted() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a recently-sent channel succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a deleted channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_expired() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiring channels always succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting an expired channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_purged() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
-
- app.channels()
- .expire(&fixtures::old())
- .await
- .expect("expiring channels always succeeds");
-
- app.channels()
- .purge(&fixtures::now())
- .await
- .expect("purging channels always succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a purged channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_not_empty() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a channel with messages fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id));
-}
diff --git a/src/channel/handlers/mod.rs b/src/channel/handlers/mod.rs
deleted file mode 100644
index f2ffd0d..0000000
--- a/src/channel/handlers/mod.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-mod create;
-mod delete;
-mod send;
-
-pub use create::handler as create;
-pub use delete::handler as delete;
-pub use send::handler as send;
-
-type PathInfo = crate::channel::Id;
diff --git a/src/channel/handlers/send/mod.rs b/src/channel/handlers/send/mod.rs
deleted file mode 100644
index bde39e5..0000000
--- a/src/channel/handlers/send/mod.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::channel::handlers::PathInfo;
-use crate::{
- app::App,
- clock::RequestedAt,
- error::{Internal, NotFound},
- message::{Body, Message, app::SendError},
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- Path(channel): Path<PathInfo>,
- RequestedAt(sent_at): RequestedAt,
- identity: Identity,
- Json(request): Json<Request>,
-) -> Result<Response, Error> {
- let message = app
- .messages()
- .send(&channel, &identity.user, &sent_at, &request.body)
- .await?;
-
- Ok(Response(message))
-}
-
-#[derive(serde::Deserialize)]
-pub struct Request {
- pub body: Body,
-}
-
-#[derive(Debug)]
-pub struct Response(pub Message);
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- let Self(message) = self;
- (StatusCode::ACCEPTED, Json(message)).into_response()
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub struct Error(#[from] pub SendError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- SendError::ChannelNotFound(_) | SendError::ChannelDeleted(_) => {
- NotFound(error).into_response()
- }
- SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(),
- }
- }
-}
diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs
deleted file mode 100644
index 70d45eb..0000000
--- a/src/channel/handlers/send/test.rs
+++ /dev/null
@@ -1,130 +0,0 @@
-use axum::extract::{Json, Path, State};
-use futures::stream::{self, StreamExt as _};
-
-use crate::{
- channel,
- event::Sequenced,
- message::app::SendError,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn messages_in_order() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint (twice)
-
- let requests = vec![
- (fixtures::now(), fixtures::message::propose()),
- (fixtures::now(), fixtures::message::propose()),
- ];
-
- for (sent_at, body) in &requests {
- let request = super::Request { body: body.clone() };
-
- let _ = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- sent_at.clone(),
- sender.clone(),
- Json(request),
- )
- .await
- .expect("sending to a valid channel succeeds");
- }
-
- // Verify the semantics
-
- let mut events = app
- .events()
- .subscribe(resume_point)
- .await
- .expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::event::stream::message)
- .filter_map(fixtures::event::stream::message::sent)
- .zip(stream::iter(requests));
-
- while let Some((event, (sent_at, body))) = events
- .next()
- .expect_ready("an event should be ready for each message")
- .await
- {
- assert_eq!(*sent_at, event.at());
- assert_eq!(sender.user.id, event.message.sender);
- assert_eq!(body, event.message.body);
- }
-}
-
-#[tokio::test]
-async fn nonexistent_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let sent_at = fixtures::now();
- let channel = channel::Id::generate();
- let request = super::Request {
- body: fixtures::message::propose(),
- };
- let super::Error(error) = super::handler(
- State(app),
- Path(channel.clone()),
- sent_at,
- sender,
- Json(request),
- )
- .await
- .expect_err("sending to a nonexistent channel fails");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- SendError::ChannelNotFound(error_channel) if channel == error_channel
- ));
-}
-
-#[tokio::test]
-async fn deleted_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a new channel succeeds");
-
- // Call the endpoint
-
- let sent_at = fixtures::now();
- let request = super::Request {
- body: fixtures::message::propose(),
- };
- let super::Error(error) = super::handler(
- State(app),
- Path(channel.id.clone()),
- sent_at,
- sender,
- Json(request),
- )
- .await
- .expect_err("sending to a deleted channel fails");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- SendError::ChannelDeleted(error_channel) if channel.id == error_channel
- ));
-}
diff --git a/src/channel/history.rs b/src/channel/history.rs
deleted file mode 100644
index 85da5a5..0000000
--- a/src/channel/history.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-use itertools::Itertools as _;
-
-use super::{
- Channel, Id,
- event::{Created, Deleted, Event},
-};
-use crate::event::{Instant, Sequence};
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub struct History {
- pub channel: Channel,
- pub deleted: Option<Instant>,
-}
-
-// State interface
-impl History {
- pub fn id(&self) -> &Id {
- &self.channel.id
- }
-
- // Snapshot of this channel as it was when created. (Note to the future: it's
- // okay if this returns a redacted or modified version of the channel. If we
- // implement renames by redacting the original name, then this should return the
- // renamed channel, not the original, even if that's not how it was "as
- // created.")
- pub fn as_created(&self) -> Channel {
- self.channel.clone()
- }
-
- pub fn as_of<S>(&self, sequence: S) -> Option<Channel>
- where
- S: Into<Sequence>,
- {
- self.events()
- .filter(Sequence::up_to(sequence.into()))
- .collect()
- }
-
- // Snapshot of this channel as of all events recorded in this history.
- pub fn as_snapshot(&self) -> Option<Channel> {
- self.events().collect()
- }
-}
-
-// Event factories
-impl History {
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
- [self.created()]
- .into_iter()
- .merge_by(self.deleted(), Sequence::merge)
- }
-
- fn created(&self) -> Event {
- Created {
- channel: self.channel.clone(),
- }
- .into()
- }
-
- fn deleted(&self) -> Option<Event> {
- self.deleted.map(|instant| {
- Deleted {
- instant,
- id: self.channel.id.clone(),
- }
- .into()
- })
- }
-}
diff --git a/src/channel/id.rs b/src/channel/id.rs
deleted file mode 100644
index 22a2700..0000000
--- a/src/channel/id.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-use std::fmt;
-
-use crate::id::Id as BaseId;
-
-// Stable identifier for a [Channel]. Prefixed with `C`.
-#[derive(
- Clone,
- Debug,
- Eq,
- Hash,
- Ord,
- PartialEq,
- PartialOrd,
- sqlx::Type,
- serde::Deserialize,
- serde::Serialize,
-)]
-#[sqlx(transparent)]
-#[serde(transparent)]
-pub struct Id(BaseId);
-
-impl From<BaseId> for Id {
- fn from(id: BaseId) -> Self {
- Self(id)
- }
-}
-
-impl Id {
- pub fn generate() -> Self {
- BaseId::generate("C")
- }
-}
-
-impl fmt::Display for Id {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
-}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
deleted file mode 100644
index bbaf33e..0000000
--- a/src/channel/mod.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-pub mod app;
-pub mod event;
-pub mod handlers;
-mod history;
-mod id;
-pub mod repo;
-mod snapshot;
-mod validate;
-
-pub use self::{event::Event, history::History, id::Id, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
deleted file mode 100644
index fd2173a..0000000
--- a/src/channel/repo.rs
+++ /dev/null
@@ -1,336 +0,0 @@
-use futures::stream::{StreamExt as _, TryStreamExt as _};
-use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-
-use crate::{
- channel::{Channel, History, Id},
- clock::DateTime,
- db::NotFound,
- event::{Instant, Sequence},
- name::{self, Name},
-};
-
-pub trait Provider {
- fn channels(&mut self) -> Channels;
-}
-
-impl Provider for Transaction<'_, Sqlite> {
- fn channels(&mut self) -> Channels {
- Channels(self)
- }
-}
-
-pub struct Channels<'t>(&'t mut SqliteConnection);
-
-impl Channels<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
- let display_name = name.display();
- let canonical_name = name.canonical();
- let created = *created;
-
- sqlx::query!(
- r#"
- insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
- "#,
- id,
- created.at,
- created.sequence,
- created.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_name (id, display_name, canonical_name)
- values ($1, $2, $3)
- "#,
- id,
- display_name,
- canonical_name,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = History {
- channel: Channel {
- created,
- id,
- name: name.clone(),
- deleted_at: None,
- },
- deleted: None,
- };
-
- Ok(channel)
- }
-
- pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> {
- let channel = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where id = $1
- "#,
- channel,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch_one(&mut *self.0)
- .await??;
-
- Ok(channel)
- }
-
- pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.created_sequence <= $1
- order by name.canonical_name
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.last_sequence > $1
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn delete(
- &mut self,
- channel: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = channel.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at channels via the API, cannot retrieve the
- // "deleted" channel's information by ignoring the deletion event.
- //
- // This also avoids the need for a separate name reservation table to ensure
- // that live channels have unique names, since the `channel` table's name field
- // is unique over non-null values.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = self.by_id(id).await?;
-
- Ok(channel)
- }
-
- pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
- let channels = sqlx::query_scalar!(
- r#"
- with has_messages as (
- select conversation
- from message
- group by conversation
- )
- delete from conversation_deleted
- where deleted_at < $1
- and id not in has_messages
- returning id as "id: Id"
- "#,
- purge_at,
- )
- .fetch_all(&mut *self.0)
- .await?;
-
- for channel in channels {
- // Wanted: a way to batch these up into one query.
- sqlx::query!(
- r#"
- delete from conversation
- where id = $1
- "#,
- channel,
- )
- .execute(&mut *self.0)
- .await?;
- }
-
- Ok(())
- }
-
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- conversation.id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- left join message
- on conversation.id = message.conversation
- where conversation.created_at < $1
- and message.id is null
- and deleted.id is null
- "#,
- expired_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum LoadError {
- Database(#[from] sqlx::Error),
- Name(#[from] name::Error),
-}
-
-impl<T> NotFound for Result<T, LoadError> {
- type Ok = T;
- type Error = LoadError;
-
- fn optional(self) -> Result<Option<T>, LoadError> {
- match self {
- Ok(value) => Ok(Some(value)),
- Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
- Err(other) => Err(other),
- }
- }
-}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
deleted file mode 100644
index 96801b8..0000000
--- a/src/channel/snapshot.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use super::{
- Id,
- event::{Created, Event},
-};
-use crate::{clock::DateTime, event::Instant, name::Name};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Channel {
- #[serde(flatten)]
- pub created: Instant,
- pub id: Id,
- pub name: Name,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub deleted_at: Option<DateTime>,
-}
-
-impl Channel {
- fn apply(state: Option<Self>, event: Event) -> Option<Self> {
- match (state, event) {
- (None, Event::Created(event)) => Some(event.into()),
- (Some(channel), Event::Deleted(event)) if channel.id == event.id => None,
- (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
- }
- }
-}
-
-impl FromIterator<Event> for Option<Channel> {
- fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
- events.into_iter().fold(None, Channel::apply)
- }
-}
-
-impl From<&Created> for Channel {
- fn from(event: &Created) -> Self {
- event.channel.clone()
- }
-}
-
-impl From<Created> for Channel {
- fn from(event: Created) -> Self {
- event.channel
- }
-}
diff --git a/src/channel/validate.rs b/src/channel/validate.rs
deleted file mode 100644
index 7894e0c..0000000
--- a/src/channel/validate.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use std::ops::Not as _;
-
-use unicode_segmentation::UnicodeSegmentation as _;
-
-use crate::name::Name;
-
-// Picked out of a hat. The power of two is not meaningful.
-const NAME_TOO_LONG: usize = 64;
-
-pub fn name(name: &Name) -> bool {
- let display = name.display();
-
- [
- display.graphemes(true).count() < NAME_TOO_LONG,
- display.chars().any(char::is_control).not(),
- display.chars().next().is_some_and(|c| !c.is_whitespace()),
- display.chars().last().is_some_and(|c| !c.is_whitespace()),
- display
- .chars()
- .zip(display.chars().skip(1))
- .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())),
- ]
- .into_iter()
- .all(|value| value)
-}