summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs224
-rw-r--r--src/channel/event.rs46
-rw-r--r--src/channel/handlers/create/mod.rs67
-rw-r--r--src/channel/handlers/create/test.rs250
-rw-r--r--src/channel/handlers/delete/mod.rs59
-rw-r--r--src/channel/handlers/delete/test.rs184
-rw-r--r--src/channel/handlers/mod.rs9
-rw-r--r--src/channel/handlers/send/mod.rs63
-rw-r--r--src/channel/handlers/send/test.rs130
-rw-r--r--src/channel/history.rs69
-rw-r--r--src/channel/id.rs38
-rw-r--r--src/channel/mod.rs10
-rw-r--r--src/channel/repo.rs336
-rw-r--r--src/channel/snapshot.rs43
-rw-r--r--src/channel/validate.rs25
15 files changed, 0 insertions, 1553 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
deleted file mode 100644
index e3b169c..0000000
--- a/src/channel/app.rs
+++ /dev/null
@@ -1,224 +0,0 @@
-use chrono::TimeDelta;
-use itertools::Itertools;
-use sqlx::sqlite::SqlitePool;
-
-use super::{
- Channel, Id,
- repo::{LoadError, Provider as _},
- validate,
-};
-use crate::{
- clock::DateTime,
- db::{Duplicate as _, NotFound as _},
- event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
- name::{self, Name},
-};
-
-pub struct Channels<'a> {
- db: &'a SqlitePool,
- events: &'a Broadcaster,
-}
-
-impl<'a> Channels<'a> {
- pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
- Self { db, events }
- }
-
- pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> {
- if !validate::name(name) {
- return Err(CreateError::InvalidName(name.clone()));
- }
-
- let mut tx = self.db.begin().await?;
- let created = tx.sequence().next(created_at).await?;
- let channel = tx
- .channels()
- .create(name, &created)
- .await
- .duplicate(|| CreateError::DuplicateName(name.clone()))?;
- tx.commit().await?;
-
- self.events
- .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
-
- Ok(channel.as_created())
- }
-
- // This function is careless with respect to time, and gets you the channel as
- // it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
- let to_not_found = || Error::NotFound(channel.clone());
- let to_deleted = || Error::Deleted(channel.clone());
-
- let mut tx = self.db.begin().await?;
- let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?;
- tx.commit().await?;
-
- channel.as_snapshot().ok_or_else(to_deleted)
- }
-
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
- let mut tx = self.db.begin().await?;
-
- let channel = tx
- .channels()
- .by_id(channel)
- .await
- .not_found(|| DeleteError::NotFound(channel.clone()))?;
- channel
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?;
-
- let mut events = Vec::new();
-
- let messages = tx.messages().live(&channel).await?;
- let has_messages = messages
- .iter()
- .map(message::History::as_snapshot)
- .any(|message| message.is_some());
- if has_messages {
- return Err(DeleteError::NotEmpty(channel.id().clone()));
- }
-
- let deleted = tx.sequence().next(deleted_at).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.extend(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
-
- tx.commit().await?;
-
- self.events.broadcast(events);
-
- Ok(())
- }
-
- pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> {
- // Somewhat arbitrarily, expire after 7 days. Active channels will not be
- // expired until their messages expire.
- let expire_at = relative_to.to_owned() - TimeDelta::days(7);
-
- let mut tx = self.db.begin().await?;
- let expired = tx.channels().expired(&expire_at).await?;
-
- let mut events = Vec::with_capacity(expired.len());
- for channel in expired {
- let deleted = tx.sequence().next(relative_to).await?;
- let channel = tx.channels().delete(&channel, &deleted).await?;
- events.push(
- channel
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
- }
-
- tx.commit().await?;
-
- self.events.broadcast(
- events
- .into_iter()
- .kmerge_by(Sequence::merge)
- .map(Event::from)
- .collect::<Vec<_>>(),
- );
-
- Ok(())
- }
-
- pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
- // Somewhat arbitrarily, purge after 6 hours.
- let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
-
- let mut tx = self.db.begin().await?;
- tx.channels().purge(&purge_at).await?;
- tx.commit().await?;
-
- Ok(())
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum CreateError {
- #[error("channel named {0} already exists")]
- DuplicateName(Name),
- #[error("invalid channel name: {0}")]
- InvalidName(Name),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for CreateError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for Error {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
- #[error("channel {0} not found")]
- NotFound(Id),
- #[error("channel {0} deleted")]
- Deleted(Id),
- #[error("channel {0} not empty")]
- NotEmpty(Id),
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for DeleteError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum ExpireError {
- #[error(transparent)]
- Database(#[from] sqlx::Error),
- #[error(transparent)]
- Name(#[from] name::Error),
-}
-
-impl From<LoadError> for ExpireError {
- fn from(error: LoadError) -> Self {
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
diff --git a/src/channel/event.rs b/src/channel/event.rs
deleted file mode 100644
index a5739f9..0000000
--- a/src/channel/event.rs
+++ /dev/null
@@ -1,46 +0,0 @@
-use super::Channel;
-use crate::{
- channel,
- event::{Instant, Sequenced},
-};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-#[serde(tag = "event", rename_all = "snake_case")]
-pub enum Event {
- Created(Created),
- Deleted(Deleted),
-}
-
-impl Sequenced for Event {
- fn instant(&self) -> Instant {
- match self {
- Self::Created(event) => event.channel.created,
- Self::Deleted(event) => event.instant,
- }
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Created {
- #[serde(flatten)]
- pub channel: Channel,
-}
-
-impl From<Created> for Event {
- fn from(event: Created) -> Self {
- Self::Created(event)
- }
-}
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Deleted {
- #[serde(flatten)]
- pub instant: Instant,
- pub id: channel::Id,
-}
-
-impl From<Deleted> for Event {
- fn from(event: Deleted) -> Self {
- Self::Deleted(event)
- }
-}
diff --git a/src/channel/handlers/create/mod.rs b/src/channel/handlers/create/mod.rs
deleted file mode 100644
index 2c860fc..0000000
--- a/src/channel/handlers/create/mod.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-use axum::{
- extract::{Json, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::{
- app::App,
- channel::{Channel, app},
- clock::RequestedAt,
- error::Internal,
- name::Name,
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- _: Identity, // requires auth, but doesn't actually care who you are
- RequestedAt(created_at): RequestedAt,
- Json(request): Json<Request>,
-) -> Result<Response, Error> {
- let channel = app
- .channels()
- .create(&request.name, &created_at)
- .await
- .map_err(Error)?;
-
- Ok(Response(channel))
-}
-
-#[derive(serde::Deserialize)]
-pub struct Request {
- pub name: Name,
-}
-
-#[derive(Debug)]
-pub struct Response(pub Channel);
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- let Self(channel) = self;
- (StatusCode::ACCEPTED, Json(channel)).into_response()
- }
-}
-
-#[derive(Debug)]
-pub struct Error(pub app::CreateError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- app::CreateError::DuplicateName(_) => {
- (StatusCode::CONFLICT, error.to_string()).into_response()
- }
- app::CreateError::InvalidName(_) => {
- (StatusCode::BAD_REQUEST, error.to_string()).into_response()
- }
- app::CreateError::Name(_) | app::CreateError::Database(_) => {
- Internal::from(error).into_response()
- }
- }
- }
-}
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs
deleted file mode 100644
index 31bb778..0000000
--- a/src/channel/handlers/create/test.rs
+++ /dev/null
@@ -1,250 +0,0 @@
-use std::future;
-
-use axum::extract::{Json, State};
-use futures::stream::StreamExt as _;
-use itertools::Itertools;
-
-use crate::{
- channel::app,
- name::Name,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn new_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint
-
- let name = fixtures::channel::propose();
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creating a channel in an empty app succeeds");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- let created = snapshot
- .events
- .into_iter()
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .exactly_one()
- .expect("only one channel has been created");
- assert_eq!(response, created.channel);
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-
- let mut events = app
- .events()
- .subscribe(resume_point)
- .await
- .expect("subscribing never fails")
- .filter_map(fixtures::event::stream::channel)
- .filter_map(fixtures::event::stream::channel::created)
- .filter(|event| future::ready(event.channel == response));
-
- let event = events.next().expect_some("creation event published").await;
-
- assert_eq!(event.channel, response);
-}
-
-#[tokio::test]
-async fn duplicate_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let request = super::Request {
- name: channel.name.clone(),
- };
- let super::Error(error) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect_err("duplicate channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::DuplicateName(name) if channel.name == name
- ));
-}
-
-#[tokio::test]
-async fn conflicting_canonical_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
-
- let existing_name = Name::from("rijksmuseum");
- app.channels()
- .create(&existing_name, &fixtures::now())
- .await
- .expect("creating a channel in an empty environment succeeds");
-
- let conflicting_name = Name::from("r\u{0133}ksmuseum");
-
- // Call the endpoint
-
- let request = super::Request {
- name: conflicting_name.clone(),
- };
- let super::Error(error) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect_err("duplicate channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::DuplicateName(name) if conflicting_name == name
- ));
-}
-
-#[tokio::test]
-async fn invalid_name() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let name = fixtures::channel::propose_invalid_name();
- let request = super::Request { name: name.clone() };
- let super::Error(error) = crate::channel::handlers::create::handler(
- State(app.clone()),
- creator,
- fixtures::now(),
- Json(request),
- )
- .await
- .expect_err("invalid channel name should fail the request");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- app::CreateError::InvalidName(error_name) if name == error_name
- ));
-}
-
-#[tokio::test]
-async fn name_reusable_after_delete() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::now()).await;
- let name = fixtures::channel::propose();
-
- // Call the endpoint (first time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) = super::handler(
- State(app.clone()),
- creator.clone(),
- fixtures::now(),
- Json(request),
- )
- .await
- .expect("new channel in an empty app");
-
- // Delete the channel
-
- app.channels()
- .delete(&response.id, &fixtures::now())
- .await
- .expect("deleting a newly-created channel succeeds");
-
- // Call the endpoint (second time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creation succeeds after original channel deleted");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-}
-
-#[tokio::test]
-async fn name_reusable_after_expiry() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let creator = fixtures::identity::create(&app, &fixtures::ancient()).await;
- let name = fixtures::channel::propose();
-
- // Call the endpoint (first time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(_) = super::handler(
- State(app.clone()),
- creator.clone(),
- fixtures::ancient(),
- Json(request),
- )
- .await
- .expect("new channel in an empty app");
-
- // Delete the channel
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiry always succeeds");
-
- // Call the endpoint (second time)
-
- let request = super::Request { name: name.clone() };
- let super::Response(response) =
- super::handler(State(app.clone()), creator, fixtures::now(), Json(request))
- .await
- .expect("creation succeeds after original channel expired");
-
- // Verify the structure of the response
-
- assert_eq!(name, response.name);
-
- // Verify the semantics
-
- let channel = app
- .channels()
- .get(&response.id)
- .await
- .expect("the newly-created channel exists");
- assert_eq!(response, channel);
-}
diff --git a/src/channel/handlers/delete/mod.rs b/src/channel/handlers/delete/mod.rs
deleted file mode 100644
index b986bec..0000000
--- a/src/channel/handlers/delete/mod.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::{
- app::App,
- channel::{self, app, handlers::PathInfo},
- clock::RequestedAt,
- error::{Internal, NotFound},
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- Path(channel): Path<PathInfo>,
- RequestedAt(deleted_at): RequestedAt,
- _: Identity,
-) -> Result<Response, Error> {
- app.channels().delete(&channel, &deleted_at).await?;
-
- Ok(Response { id: channel })
-}
-
-#[derive(Debug, serde::Serialize)]
-pub struct Response {
- pub id: channel::Id,
-}
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- (StatusCode::ACCEPTED, Json(self)).into_response()
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub struct Error(#[from] pub app::DeleteError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- app::DeleteError::NotFound(_) | app::DeleteError::Deleted(_) => {
- NotFound(error).into_response()
- }
- app::DeleteError::NotEmpty(_) => {
- (StatusCode::CONFLICT, error.to_string()).into_response()
- }
- app::DeleteError::Name(_) | app::DeleteError::Database(_) => {
- Internal::from(error).into_response()
- }
- }
- }
-}
diff --git a/src/channel/handlers/delete/test.rs b/src/channel/handlers/delete/test.rs
deleted file mode 100644
index 99c19db..0000000
--- a/src/channel/handlers/delete/test.rs
+++ /dev/null
@@ -1,184 +0,0 @@
-use axum::extract::{Path, State};
-use itertools::Itertools;
-
-use crate::{channel::app, test::fixtures};
-
-#[tokio::test]
-pub async fn valid_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let response = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect("deleting a valid channel succeeds");
-
- // Verify the response
-
- assert_eq!(channel.id, response.id);
-
- // Verify the semantics
-
- let snapshot = app.boot().snapshot().await.expect("boot always succeeds");
- let created = snapshot
- .events
- .into_iter()
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
- .exactly_one()
- .expect("only one channel has been created");
- // We don't expect `channel` to match the event exactly, as the name will have been
- // tombstoned and the channel given a `deleted_at` date.
- assert_eq!(channel.id, created.channel.id);
-}
-
-#[tokio::test]
-pub async fn invalid_channel_id() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::fictitious();
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a nonexistent channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel));
-}
-
-#[tokio::test]
-pub async fn channel_deleted() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a recently-sent channel succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a deleted channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_expired() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
-
- app.channels()
- .expire(&fixtures::now())
- .await
- .expect("expiring channels always succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting an expired channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_purged() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
-
- app.channels()
- .expire(&fixtures::old())
- .await
- .expect("expiring channels always succeeds");
-
- app.channels()
- .purge(&fixtures::now())
- .await
- .expect("purging channels always succeeds");
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a purged channel fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id));
-}
-
-#[tokio::test]
-pub async fn channel_not_empty() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let sender = fixtures::user::create(&app, &fixtures::now()).await;
- fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
-
- // Send the request
-
- let deleter = fixtures::identity::create(&app, &fixtures::now()).await;
- let super::Error(error) = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- fixtures::now(),
- deleter,
- )
- .await
- .expect_err("deleting a channel with messages fails");
-
- // Verify the response
-
- assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id));
-}
diff --git a/src/channel/handlers/mod.rs b/src/channel/handlers/mod.rs
deleted file mode 100644
index f2ffd0d..0000000
--- a/src/channel/handlers/mod.rs
+++ /dev/null
@@ -1,9 +0,0 @@
-mod create;
-mod delete;
-mod send;
-
-pub use create::handler as create;
-pub use delete::handler as delete;
-pub use send::handler as send;
-
-type PathInfo = crate::channel::Id;
diff --git a/src/channel/handlers/send/mod.rs b/src/channel/handlers/send/mod.rs
deleted file mode 100644
index bde39e5..0000000
--- a/src/channel/handlers/send/mod.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
- response::{self, IntoResponse},
-};
-
-use crate::channel::handlers::PathInfo;
-use crate::{
- app::App,
- clock::RequestedAt,
- error::{Internal, NotFound},
- message::{Body, Message, app::SendError},
- token::extract::Identity,
-};
-
-#[cfg(test)]
-mod test;
-
-pub async fn handler(
- State(app): State<App>,
- Path(channel): Path<PathInfo>,
- RequestedAt(sent_at): RequestedAt,
- identity: Identity,
- Json(request): Json<Request>,
-) -> Result<Response, Error> {
- let message = app
- .messages()
- .send(&channel, &identity.user, &sent_at, &request.body)
- .await?;
-
- Ok(Response(message))
-}
-
-#[derive(serde::Deserialize)]
-pub struct Request {
- pub body: Body,
-}
-
-#[derive(Debug)]
-pub struct Response(pub Message);
-
-impl IntoResponse for Response {
- fn into_response(self) -> response::Response {
- let Self(message) = self;
- (StatusCode::ACCEPTED, Json(message)).into_response()
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub struct Error(#[from] pub SendError);
-
-impl IntoResponse for Error {
- fn into_response(self) -> response::Response {
- let Self(error) = self;
- match error {
- SendError::ChannelNotFound(_) | SendError::ChannelDeleted(_) => {
- NotFound(error).into_response()
- }
- SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(),
- }
- }
-}
diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs
deleted file mode 100644
index 70d45eb..0000000
--- a/src/channel/handlers/send/test.rs
+++ /dev/null
@@ -1,130 +0,0 @@
-use axum::extract::{Json, Path, State};
-use futures::stream::{self, StreamExt as _};
-
-use crate::{
- channel,
- event::Sequenced,
- message::app::SendError,
- test::fixtures::{self, future::Expect as _},
-};
-
-#[tokio::test]
-async fn messages_in_order() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
- let resume_point = fixtures::boot::resume_point(&app).await;
-
- // Call the endpoint (twice)
-
- let requests = vec![
- (fixtures::now(), fixtures::message::propose()),
- (fixtures::now(), fixtures::message::propose()),
- ];
-
- for (sent_at, body) in &requests {
- let request = super::Request { body: body.clone() };
-
- let _ = super::handler(
- State(app.clone()),
- Path(channel.id.clone()),
- sent_at.clone(),
- sender.clone(),
- Json(request),
- )
- .await
- .expect("sending to a valid channel succeeds");
- }
-
- // Verify the semantics
-
- let mut events = app
- .events()
- .subscribe(resume_point)
- .await
- .expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::event::stream::message)
- .filter_map(fixtures::event::stream::message::sent)
- .zip(stream::iter(requests));
-
- while let Some((event, (sent_at, body))) = events
- .next()
- .expect_ready("an event should be ready for each message")
- .await
- {
- assert_eq!(*sent_at, event.at());
- assert_eq!(sender.user.id, event.message.sender);
- assert_eq!(body, event.message.body);
- }
-}
-
-#[tokio::test]
-async fn nonexistent_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
-
- // Call the endpoint
-
- let sent_at = fixtures::now();
- let channel = channel::Id::generate();
- let request = super::Request {
- body: fixtures::message::propose(),
- };
- let super::Error(error) = super::handler(
- State(app),
- Path(channel.clone()),
- sent_at,
- sender,
- Json(request),
- )
- .await
- .expect_err("sending to a nonexistent channel fails");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- SendError::ChannelNotFound(error_channel) if channel == error_channel
- ));
-}
-
-#[tokio::test]
-async fn deleted_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::identity::create(&app, &fixtures::now()).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- app.channels()
- .delete(&channel.id, &fixtures::now())
- .await
- .expect("deleting a new channel succeeds");
-
- // Call the endpoint
-
- let sent_at = fixtures::now();
- let request = super::Request {
- body: fixtures::message::propose(),
- };
- let super::Error(error) = super::handler(
- State(app),
- Path(channel.id.clone()),
- sent_at,
- sender,
- Json(request),
- )
- .await
- .expect_err("sending to a deleted channel fails");
-
- // Verify the structure of the response
-
- assert!(matches!(
- error,
- SendError::ChannelDeleted(error_channel) if channel.id == error_channel
- ));
-}
diff --git a/src/channel/history.rs b/src/channel/history.rs
deleted file mode 100644
index 85da5a5..0000000
--- a/src/channel/history.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-use itertools::Itertools as _;
-
-use super::{
- Channel, Id,
- event::{Created, Deleted, Event},
-};
-use crate::event::{Instant, Sequence};
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-pub struct History {
- pub channel: Channel,
- pub deleted: Option<Instant>,
-}
-
-// State interface
-impl History {
- pub fn id(&self) -> &Id {
- &self.channel.id
- }
-
- // Snapshot of this channel as it was when created. (Note to the future: it's
- // okay if this returns a redacted or modified version of the channel. If we
- // implement renames by redacting the original name, then this should return the
- // renamed channel, not the original, even if that's not how it was "as
- // created.")
- pub fn as_created(&self) -> Channel {
- self.channel.clone()
- }
-
- pub fn as_of<S>(&self, sequence: S) -> Option<Channel>
- where
- S: Into<Sequence>,
- {
- self.events()
- .filter(Sequence::up_to(sequence.into()))
- .collect()
- }
-
- // Snapshot of this channel as of all events recorded in this history.
- pub fn as_snapshot(&self) -> Option<Channel> {
- self.events().collect()
- }
-}
-
-// Event factories
-impl History {
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
- [self.created()]
- .into_iter()
- .merge_by(self.deleted(), Sequence::merge)
- }
-
- fn created(&self) -> Event {
- Created {
- channel: self.channel.clone(),
- }
- .into()
- }
-
- fn deleted(&self) -> Option<Event> {
- self.deleted.map(|instant| {
- Deleted {
- instant,
- id: self.channel.id.clone(),
- }
- .into()
- })
- }
-}
diff --git a/src/channel/id.rs b/src/channel/id.rs
deleted file mode 100644
index 22a2700..0000000
--- a/src/channel/id.rs
+++ /dev/null
@@ -1,38 +0,0 @@
-use std::fmt;
-
-use crate::id::Id as BaseId;
-
-// Stable identifier for a [Channel]. Prefixed with `C`.
-#[derive(
- Clone,
- Debug,
- Eq,
- Hash,
- Ord,
- PartialEq,
- PartialOrd,
- sqlx::Type,
- serde::Deserialize,
- serde::Serialize,
-)]
-#[sqlx(transparent)]
-#[serde(transparent)]
-pub struct Id(BaseId);
-
-impl From<BaseId> for Id {
- fn from(id: BaseId) -> Self {
- Self(id)
- }
-}
-
-impl Id {
- pub fn generate() -> Self {
- BaseId::generate("C")
- }
-}
-
-impl fmt::Display for Id {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
-}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
deleted file mode 100644
index bbaf33e..0000000
--- a/src/channel/mod.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-pub mod app;
-pub mod event;
-pub mod handlers;
-mod history;
-mod id;
-pub mod repo;
-mod snapshot;
-mod validate;
-
-pub use self::{event::Event, history::History, id::Id, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
deleted file mode 100644
index fd2173a..0000000
--- a/src/channel/repo.rs
+++ /dev/null
@@ -1,336 +0,0 @@
-use futures::stream::{StreamExt as _, TryStreamExt as _};
-use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
-
-use crate::{
- channel::{Channel, History, Id},
- clock::DateTime,
- db::NotFound,
- event::{Instant, Sequence},
- name::{self, Name},
-};
-
-pub trait Provider {
- fn channels(&mut self) -> Channels;
-}
-
-impl Provider for Transaction<'_, Sqlite> {
- fn channels(&mut self) -> Channels {
- Channels(self)
- }
-}
-
-pub struct Channels<'t>(&'t mut SqliteConnection);
-
-impl Channels<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
- let display_name = name.display();
- let canonical_name = name.canonical();
- let created = *created;
-
- sqlx::query!(
- r#"
- insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
- "#,
- id,
- created.at,
- created.sequence,
- created.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_name (id, display_name, canonical_name)
- values ($1, $2, $3)
- "#,
- id,
- display_name,
- canonical_name,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = History {
- channel: Channel {
- created,
- id,
- name: name.clone(),
- deleted_at: None,
- },
- deleted: None,
- };
-
- Ok(channel)
- }
-
- pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> {
- let channel = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where id = $1
- "#,
- channel,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch_one(&mut *self.0)
- .await??;
-
- Ok(channel)
- }
-
- pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.created_sequence <= $1
- order by name.canonical_name
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- where conversation.last_sequence > $1
- "#,
- resume_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-
- pub async fn delete(
- &mut self,
- channel: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = channel.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at channels via the API, cannot retrieve the
- // "deleted" channel's information by ignoring the deletion event.
- //
- // This also avoids the need for a separate name reservation table to ensure
- // that live channels have unique names, since the `channel` table's name field
- // is unique over non-null values.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let channel = self.by_id(id).await?;
-
- Ok(channel)
- }
-
- pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
- let channels = sqlx::query_scalar!(
- r#"
- with has_messages as (
- select conversation
- from message
- group by conversation
- )
- delete from conversation_deleted
- where deleted_at < $1
- and id not in has_messages
- returning id as "id: Id"
- "#,
- purge_at,
- )
- .fetch_all(&mut *self.0)
- .await?;
-
- for channel in channels {
- // Wanted: a way to batch these up into one query.
- sqlx::query!(
- r#"
- delete from conversation
- where id = $1
- "#,
- channel,
- )
- .execute(&mut *self.0)
- .await?;
- }
-
- Ok(())
- }
-
- pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> {
- let channels = sqlx::query!(
- r#"
- select
- conversation.id as "id: Id",
- name.display_name as "display_name?: String",
- name.canonical_name as "canonical_name?: String",
- conversation.created_at as "created_at: DateTime",
- conversation.created_sequence as "created_sequence: Sequence",
- deleted.deleted_at as "deleted_at?: DateTime",
- deleted.deleted_sequence as "deleted_sequence?: Sequence"
- from conversation
- left join conversation_name as name
- using (id)
- left join conversation_deleted as deleted
- using (id)
- left join message
- on conversation.id = message.conversation
- where conversation.created_at < $1
- and message.id is null
- and deleted.id is null
- "#,
- expired_at,
- )
- .map(|row| {
- Ok::<_, name::Error>(History {
- channel: Channel {
- created: Instant::new(row.created_at, row.created_sequence),
- id: row.id,
- name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(),
- deleted_at: row.deleted_at,
- },
- deleted: Instant::optional(row.deleted_at, row.deleted_sequence),
- })
- })
- .fetch(&mut *self.0)
- .map(|res| Ok::<_, LoadError>(res??))
- .try_collect()
- .await?;
-
- Ok(channels)
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum LoadError {
- Database(#[from] sqlx::Error),
- Name(#[from] name::Error),
-}
-
-impl<T> NotFound for Result<T, LoadError> {
- type Ok = T;
- type Error = LoadError;
-
- fn optional(self) -> Result<Option<T>, LoadError> {
- match self {
- Ok(value) => Ok(Some(value)),
- Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None),
- Err(other) => Err(other),
- }
- }
-}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
deleted file mode 100644
index 96801b8..0000000
--- a/src/channel/snapshot.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-use super::{
- Id,
- event::{Created, Event},
-};
-use crate::{clock::DateTime, event::Instant, name::Name};
-
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Channel {
- #[serde(flatten)]
- pub created: Instant,
- pub id: Id,
- pub name: Name,
- #[serde(skip_serializing_if = "Option::is_none")]
- pub deleted_at: Option<DateTime>,
-}
-
-impl Channel {
- fn apply(state: Option<Self>, event: Event) -> Option<Self> {
- match (state, event) {
- (None, Event::Created(event)) => Some(event.into()),
- (Some(channel), Event::Deleted(event)) if channel.id == event.id => None,
- (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
- }
- }
-}
-
-impl FromIterator<Event> for Option<Channel> {
- fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
- events.into_iter().fold(None, Channel::apply)
- }
-}
-
-impl From<&Created> for Channel {
- fn from(event: &Created) -> Self {
- event.channel.clone()
- }
-}
-
-impl From<Created> for Channel {
- fn from(event: Created) -> Self {
- event.channel
- }
-}
diff --git a/src/channel/validate.rs b/src/channel/validate.rs
deleted file mode 100644
index 7894e0c..0000000
--- a/src/channel/validate.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use std::ops::Not as _;
-
-use unicode_segmentation::UnicodeSegmentation as _;
-
-use crate::name::Name;
-
-// Picked out of a hat. The power of two is not meaningful.
-const NAME_TOO_LONG: usize = 64;
-
-pub fn name(name: &Name) -> bool {
- let display = name.display();
-
- [
- display.graphemes(true).count() < NAME_TOO_LONG,
- display.chars().any(char::is_control).not(),
- display.chars().next().is_some_and(|c| !c.is_whitespace()),
- display.chars().last().is_some_and(|c| !c.is_whitespace()),
- display
- .chars()
- .zip(display.chars().skip(1))
- .all(|(a, b)| !(a.is_whitespace() && b.is_whitespace())),
- ]
- .into_iter()
- .all(|value| value)
-}