diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/app.rs | 6 | ||||
| -rw-r--r-- | src/boot/app.rs | 16 | ||||
| -rw-r--r-- | src/boot/handlers/boot/test.rs | 81 | ||||
| -rw-r--r-- | src/conversation/app.rs (renamed from src/channel/app.rs) | 90 | ||||
| -rw-r--r-- | src/conversation/event.rs (renamed from src/channel/event.rs) | 10 | ||||
| -rw-r--r-- | src/conversation/handlers/create/mod.rs (renamed from src/channel/handlers/create/mod.rs) | 14 | ||||
| -rw-r--r-- | src/conversation/handlers/create/test.rs (renamed from src/channel/handlers/create/test.rs) | 90 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/mod.rs (renamed from src/channel/handlers/delete/mod.rs) | 12 | ||||
| -rw-r--r-- | src/conversation/handlers/delete/test.rs (renamed from src/channel/handlers/delete/test.rs) | 94 | ||||
| -rw-r--r-- | src/conversation/handlers/mod.rs (renamed from src/channel/handlers/mod.rs) | 2 | ||||
| -rw-r--r-- | src/conversation/handlers/send/mod.rs (renamed from src/channel/handlers/send/mod.rs) | 8 | ||||
| -rw-r--r-- | src/conversation/handlers/send/test.rs (renamed from src/channel/handlers/send/test.rs) | 36 | ||||
| -rw-r--r-- | src/conversation/history.rs (renamed from src/channel/history.rs) | 30 | ||||
| -rw-r--r-- | src/conversation/id.rs (renamed from src/channel/id.rs) | 2 | ||||
| -rw-r--r-- | src/conversation/mod.rs (renamed from src/channel/mod.rs) | 2 | ||||
| -rw-r--r-- | src/conversation/repo.rs (renamed from src/channel/repo.rs) | 68 | ||||
| -rw-r--r-- | src/conversation/snapshot.rs (renamed from src/channel/snapshot.rs) | 20 | ||||
| -rw-r--r-- | src/conversation/validate.rs (renamed from src/channel/validate.rs) | 0 | ||||
| -rw-r--r-- | src/event/app.rs | 16 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/conversation.rs (renamed from src/event/handlers/stream/test/channel.rs) | 130 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/message.rs | 65 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/resume.rs | 43 | ||||
| -rw-r--r-- | src/event/handlers/stream/test/token.rs | 24 | ||||
| -rw-r--r-- | src/event/mod.rs | 8 | ||||
| -rw-r--r-- | src/expire.rs | 6 | ||||
| -rw-r--r-- | src/lib.rs | 2 | ||||
| -rw-r--r-- | src/message/app.rs | 36 | ||||
| -rw-r--r-- | src/message/handlers/delete/test.rs | 21 | ||||
| -rw-r--r-- | src/message/repo.rs | 29 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 4 | ||||
| -rw-r--r-- | src/routes.rs | 16 | ||||
| -rw-r--r-- | src/test/fixtures/conversation.rs (renamed from src/test/fixtures/channel.rs) | 12 | ||||
| -rw-r--r-- | src/test/fixtures/event/mod.rs | 6 | ||||
| -rw-r--r-- | src/test/fixtures/event/stream.rs | 14 | ||||
| -rw-r--r-- | src/test/fixtures/message.rs | 13 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 2 | ||||
| -rw-r--r-- | src/ui/handlers/conversation.rs (renamed from src/ui/handlers/channel.rs) | 11 | ||||
| -rw-r--r-- | src/ui/handlers/mod.rs | 4 |
39 files changed, 542 insertions, 503 deletions
@@ -2,7 +2,7 @@ use sqlx::sqlite::SqlitePool; use crate::{ boot::app::Boot, - channel::app::Channels, + conversation::app::Conversations, event::{self, app::Events}, invite::app::Invites, message::app::Messages, @@ -37,8 +37,8 @@ impl App { Boot::new(&self.db) } - pub const fn channels(&self) -> Channels { - Channels::new(&self.db, &self.events) + pub const fn conversations(&self) -> Conversations { + Conversations::new(&self.db, &self.events) } pub const fn events(&self) -> Events { diff --git a/src/boot/app.rs b/src/boot/app.rs index 89eec12..0ed5d1b 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -3,7 +3,7 @@ use sqlx::sqlite::SqlitePool; use super::Snapshot; use crate::{ - channel::{self, repo::Provider as _}, + conversation::{self, repo::Provider as _}, event::{Event, Sequence, repo::Provider as _}, message::{self, repo::Provider as _}, name, @@ -24,7 +24,7 @@ impl<'a> Boot<'a> { let resume_point = tx.sequence().current().await?; let users = tx.users().all(resume_point).await?; - let channels = tx.channels().all(resume_point).await?; + let conversations = tx.conversations().all(resume_point).await?; let messages = tx.messages().all(resume_point).await?; tx.commit().await?; @@ -36,9 +36,9 @@ impl<'a> Boot<'a> { .filter(Sequence::up_to(resume_point)) .map(Event::from); - let channel_events = channels + let conversation_events = conversations .iter() - .map(channel::History::events) + .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::up_to(resume_point)) .map(Event::from); @@ -51,7 +51,7 @@ impl<'a> Boot<'a> { .map(Event::from); let events = user_events - .merge_by(channel_events, Sequence::merge) + .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect(); @@ -79,9 +79,9 @@ impl From<user::repo::LoadError> for Error { } } -impl From<channel::repo::LoadError> for Error { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From<conversation::repo::LoadError> for Error { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Name(error) => error.into(), LoadError::Database(error) => error.into(), diff --git a/src/boot/handlers/boot/test.rs b/src/boot/handlers/boot/test.rs index 1e590a7..c7c511a 100644 --- a/src/boot/handlers/boot/test.rs +++ b/src/boot/handlers/boot/test.rs @@ -37,9 +37,9 @@ async fn includes_users() { } #[tokio::test] -async fn includes_channels() { +async fn includes_conversations() { let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -50,19 +50,19 @@ async fn includes_channels() { .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - assert_eq!(channel, created.channel); + .expect("only one conversation has been created"); + assert_eq!(conversation, created.conversation); } #[tokio::test] async fn includes_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -84,9 +84,9 @@ async fn includes_messages() { async fn includes_expired_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let expired_message = - fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -126,8 +126,9 @@ async fn includes_expired_messages() { async fn includes_deleted_messages() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let deleted_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let deleted_message = + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; app.messages() .delete(&sender, &deleted_message.id, &fixtures::now()) @@ -164,11 +165,11 @@ async fn includes_deleted_messages() { } #[tokio::test] -async fn includes_expired_channels() { +async fn includes_expired_conversations() { let app = fixtures::scratch_app().await; - let expired_channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let expired_conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - app.channels() + app.conversations() .expire(&fixtures::now()) .await .expect("expiry never fails"); @@ -183,34 +184,34 @@ async fn includes_expired_channels() { .events .iter() .cloned() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - // We don't expect `expired_channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(expired_channel.id, created.channel.id); + .expect("only one conversation has been created"); + // We don't expect `expired_conversation` to match the event exactly, as the name will + // have been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(expired_conversation.id, created.conversation.id); let deleted = response .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::deleted) .exactly_one() - .expect("only one channel has expired"); - assert_eq!(expired_channel.id, deleted.id); + .expect("only one conversation has expired"); + assert_eq!(expired_conversation.id, deleted.id); } #[tokio::test] -async fn includes_deleted_channels() { +async fn includes_deleted_conversations() { let app = fixtures::scratch_app().await; - let deleted_channel = fixtures::channel::create(&app, &fixtures::now()).await; + let deleted_conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - app.channels() - .delete(&deleted_channel.id, &fixtures::now()) + app.conversations() + .delete(&deleted_conversation.id, &fixtures::now()) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); let viewer = fixtures::identity::fictitious(); let response = super::handler(State(app), viewer) @@ -222,21 +223,21 @@ async fn includes_deleted_channels() { .events .iter() .cloned() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - // We don't expect `deleted_channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(deleted_channel.id, created.channel.id); + .expect("only one conversation has been created"); + // We don't expect `deleted_conversation` to match the event exactly, as the name will + // have been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(deleted_conversation.id, created.conversation.id); let deleted = response .snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::deleted) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::deleted) .exactly_one() - .expect("only one channel has been deleted"); - assert_eq!(deleted_channel.id, deleted.id); + .expect("only one conversation has been deleted"); + assert_eq!(deleted_conversation.id, deleted.id); } diff --git a/src/channel/app.rs b/src/conversation/app.rs index e3b169c..81ccdcf 100644 --- a/src/channel/app.rs +++ b/src/conversation/app.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use sqlx::sqlite::SqlitePool; use super::{ - Channel, Id, + Conversation, Id, repo::{LoadError, Provider as _}, validate, }; @@ -15,76 +15,88 @@ use crate::{ name::{self, Name}, }; -pub struct Channels<'a> { +pub struct Conversations<'a> { db: &'a SqlitePool, events: &'a Broadcaster, } -impl<'a> Channels<'a> { +impl<'a> Conversations<'a> { pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self { Self { db, events } } - pub async fn create(&self, name: &Name, created_at: &DateTime) -> Result<Channel, CreateError> { + pub async fn create( + &self, + name: &Name, + created_at: &DateTime, + ) -> Result<Conversation, CreateError> { if !validate::name(name) { return Err(CreateError::InvalidName(name.clone())); } let mut tx = self.db.begin().await?; let created = tx.sequence().next(created_at).await?; - let channel = tx - .channels() + let conversation = tx + .conversations() .create(name, &created) .await .duplicate(|| CreateError::DuplicateName(name.clone()))?; tx.commit().await?; self.events - .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); + .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>()); - Ok(channel.as_created()) + Ok(conversation.as_created()) } - // This function is careless with respect to time, and gets you the channel as - // it exists in the specific moment when you call it. - pub async fn get(&self, channel: &Id) -> Result<Channel, Error> { - let to_not_found = || Error::NotFound(channel.clone()); - let to_deleted = || Error::Deleted(channel.clone()); + // This function is careless with respect to time, and gets you the + // conversation as it exists in the specific moment when you call it. + pub async fn get(&self, conversation: &Id) -> Result<Conversation, Error> { + let to_not_found = || Error::NotFound(conversation.clone()); + let to_deleted = || Error::Deleted(conversation.clone()); let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; tx.commit().await?; - channel.as_snapshot().ok_or_else(to_deleted) + conversation.as_snapshot().ok_or_else(to_deleted) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn delete( + &self, + conversation: &Id, + deleted_at: &DateTime, + ) -> Result<(), DeleteError> { let mut tx = self.db.begin().await?; - let channel = tx - .channels() - .by_id(channel) + let conversation = tx + .conversations() + .by_id(conversation) .await - .not_found(|| DeleteError::NotFound(channel.clone()))?; - channel + .not_found(|| DeleteError::NotFound(conversation.clone()))?; + conversation .as_snapshot() - .ok_or_else(|| DeleteError::Deleted(channel.id().clone()))?; + .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?; let mut events = Vec::new(); - let messages = tx.messages().live(&channel).await?; + let messages = tx.messages().live(&conversation).await?; let has_messages = messages .iter() .map(message::History::as_snapshot) .any(|message| message.is_some()); if has_messages { - return Err(DeleteError::NotEmpty(channel.id().clone())); + return Err(DeleteError::NotEmpty(conversation.id().clone())); } let deleted = tx.sequence().next(deleted_at).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; events.extend( - channel + conversation .events() .filter(Sequence::start_from(deleted.sequence)) .map(Event::from), @@ -98,19 +110,19 @@ impl<'a> Channels<'a> { } pub async fn expire(&self, relative_to: &DateTime) -> Result<(), ExpireError> { - // Somewhat arbitrarily, expire after 7 days. Active channels will not be + // Somewhat arbitrarily, expire after 7 days. Active conversation will not be // expired until their messages expire. let expire_at = relative_to.to_owned() - TimeDelta::days(7); let mut tx = self.db.begin().await?; - let expired = tx.channels().expired(&expire_at).await?; + let expired = tx.conversations().expired(&expire_at).await?; let mut events = Vec::with_capacity(expired.len()); - for channel in expired { + for conversation in expired { let deleted = tx.sequence().next(relative_to).await?; - let channel = tx.channels().delete(&channel, &deleted).await?; + let conversation = tx.conversations().delete(&conversation, &deleted).await?; events.push( - channel + conversation .events() .filter(Sequence::start_from(deleted.sequence)), ); @@ -134,7 +146,7 @@ impl<'a> Channels<'a> { let purge_at = relative_to.to_owned() - TimeDelta::hours(6); let mut tx = self.db.begin().await?; - tx.channels().purge(&purge_at).await?; + tx.conversations().purge(&purge_at).await?; tx.commit().await?; Ok(()) @@ -143,9 +155,9 @@ impl<'a> Channels<'a> { #[derive(Debug, thiserror::Error)] pub enum CreateError { - #[error("channel named {0} already exists")] + #[error("conversation named {0} already exists")] DuplicateName(Name), - #[error("invalid channel name: {0}")] + #[error("invalid conversation name: {0}")] InvalidName(Name), #[error(transparent)] Database(#[from] sqlx::Error), @@ -164,9 +176,9 @@ impl From<LoadError> for CreateError { #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("channel {0} not found")] + #[error("conversation {0} not found")] NotFound(Id), - #[error("channel {0} deleted")] + #[error("conversation {0} deleted")] Deleted(Id), #[error(transparent)] Database(#[from] sqlx::Error), @@ -185,11 +197,11 @@ impl From<LoadError> for Error { #[derive(Debug, thiserror::Error)] pub enum DeleteError { - #[error("channel {0} not found")] + #[error("conversation {0} not found")] NotFound(Id), - #[error("channel {0} deleted")] + #[error("conversation {0} deleted")] Deleted(Id), - #[error("channel {0} not empty")] + #[error("conversation {0} not empty")] NotEmpty(Id), #[error(transparent)] Database(#[from] sqlx::Error), diff --git a/src/channel/event.rs b/src/conversation/event.rs index a5739f9..f5e8a81 100644 --- a/src/channel/event.rs +++ b/src/conversation/event.rs @@ -1,6 +1,6 @@ -use super::Channel; +use super::Conversation; use crate::{ - channel, + conversation, event::{Instant, Sequenced}, }; @@ -14,7 +14,7 @@ pub enum Event { impl Sequenced for Event { fn instant(&self) -> Instant { match self { - Self::Created(event) => event.channel.created, + Self::Created(event) => event.conversation.created, Self::Deleted(event) => event.instant, } } @@ -23,7 +23,7 @@ impl Sequenced for Event { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Created { #[serde(flatten)] - pub channel: Channel, + pub conversation: Conversation, } impl From<Created> for Event { @@ -36,7 +36,7 @@ impl From<Created> for Event { pub struct Deleted { #[serde(flatten)] pub instant: Instant, - pub id: channel::Id, + pub id: conversation::Id, } impl From<Deleted> for Event { diff --git a/src/channel/handlers/create/mod.rs b/src/conversation/handlers/create/mod.rs index 2c860fc..18eca1f 100644 --- a/src/channel/handlers/create/mod.rs +++ b/src/conversation/handlers/create/mod.rs @@ -6,8 +6,8 @@ use axum::{ use crate::{ app::App, - channel::{Channel, app}, clock::RequestedAt, + conversation::{Conversation, app}, error::Internal, name::Name, token::extract::Identity, @@ -22,13 +22,13 @@ pub async fn handler( RequestedAt(created_at): RequestedAt, Json(request): Json<Request>, ) -> Result<Response, Error> { - let channel = app - .channels() + let conversation = app + .conversations() .create(&request.name, &created_at) .await .map_err(Error)?; - Ok(Response(channel)) + Ok(Response(conversation)) } #[derive(serde::Deserialize)] @@ -37,12 +37,12 @@ pub struct Request { } #[derive(Debug)] -pub struct Response(pub Channel); +pub struct Response(pub Conversation); impl IntoResponse for Response { fn into_response(self) -> response::Response { - let Self(channel) = self; - (StatusCode::ACCEPTED, Json(channel)).into_response() + let Self(conversation) = self; + (StatusCode::ACCEPTED, Json(conversation)).into_response() } } diff --git a/src/channel/handlers/create/test.rs b/src/conversation/handlers/create/test.rs index 31bb778..bc05b00 100644 --- a/src/channel/handlers/create/test.rs +++ b/src/conversation/handlers/create/test.rs @@ -5,13 +5,13 @@ use futures::stream::StreamExt as _; use itertools::Itertools; use crate::{ - channel::app, + conversation::app, name::Name, test::fixtures::{self, future::Expect as _}, }; #[tokio::test] -async fn new_channel() { +async fn new_conversation() { // Set up the environment let app = fixtures::scratch_app().await; @@ -20,12 +20,12 @@ async fn new_channel() { // Call the endpoint - let name = fixtures::channel::propose(); + let name = fixtures::conversation::propose(); let request = super::Request { name: name.clone() }; let super::Response(response) = super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) .await - .expect("creating a channel in an empty app succeeds"); + .expect("creating a conversation in an empty app succeeds"); // Verify the structure of the response @@ -37,31 +37,31 @@ async fn new_channel() { let created = snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - assert_eq!(response, created.channel); + .expect("only one conversation has been created"); + assert_eq!(response, created.conversation); - let channel = app - .channels() + let conversation = app + .conversations() .get(&response.id) .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); let mut events = app .events() .subscribe(resume_point) .await .expect("subscribing never fails") - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == response)); + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == response)); let event = events.next().expect_some("creation event published").await; - assert_eq!(event.channel, response); + assert_eq!(event.conversation, response); } #[tokio::test] @@ -70,23 +70,23 @@ async fn duplicate_name() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; // Call the endpoint let request = super::Request { - name: channel.name.clone(), + name: conversation.name.clone(), }; let super::Error(error) = super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) .await - .expect_err("duplicate channel name should fail the request"); + .expect_err("duplicate conversation name should fail the request"); // Verify the structure of the response assert!(matches!( error, - app::CreateError::DuplicateName(name) if channel.name == name + app::CreateError::DuplicateName(name) if conversation.name == name )); } @@ -98,10 +98,10 @@ async fn conflicting_canonical_name() { let creator = fixtures::identity::create(&app, &fixtures::now()).await; let existing_name = Name::from("rijksmuseum"); - app.channels() + app.conversations() .create(&existing_name, &fixtures::now()) .await - .expect("creating a channel in an empty environment succeeds"); + .expect("creating a conversation in an empty environment succeeds"); let conflicting_name = Name::from("r\u{0133}ksmuseum"); @@ -113,7 +113,7 @@ async fn conflicting_canonical_name() { let super::Error(error) = super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) .await - .expect_err("duplicate channel name should fail the request"); + .expect_err("duplicate conversation name should fail the request"); // Verify the structure of the response @@ -132,16 +132,16 @@ async fn invalid_name() { // Call the endpoint - let name = fixtures::channel::propose_invalid_name(); + let name = fixtures::conversation::propose_invalid_name(); let request = super::Request { name: name.clone() }; - let super::Error(error) = crate::channel::handlers::create::handler( + let super::Error(error) = crate::conversation::handlers::create::handler( State(app.clone()), creator, fixtures::now(), Json(request), ) .await - .expect_err("invalid channel name should fail the request"); + .expect_err("invalid conversation name should fail the request"); // Verify the structure of the response @@ -157,7 +157,7 @@ async fn name_reusable_after_delete() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::now()).await; - let name = fixtures::channel::propose(); + let name = fixtures::conversation::propose(); // Call the endpoint (first time) @@ -169,14 +169,14 @@ async fn name_reusable_after_delete() { Json(request), ) .await - .expect("new channel in an empty app"); + .expect("new conversation in an empty app"); - // Delete the channel + // Delete the conversation - app.channels() + app.conversations() .delete(&response.id, &fixtures::now()) .await - .expect("deleting a newly-created channel succeeds"); + .expect("deleting a newly-created conversation succeeds"); // Call the endpoint (second time) @@ -184,7 +184,7 @@ async fn name_reusable_after_delete() { let super::Response(response) = super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) .await - .expect("creation succeeds after original channel deleted"); + .expect("creation succeeds after original conversation deleted"); // Verify the structure of the response @@ -192,12 +192,12 @@ async fn name_reusable_after_delete() { // Verify the semantics - let channel = app - .channels() + let conversation = app + .conversations() .get(&response.id) .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); } #[tokio::test] @@ -206,7 +206,7 @@ async fn name_reusable_after_expiry() { let app = fixtures::scratch_app().await; let creator = fixtures::identity::create(&app, &fixtures::ancient()).await; - let name = fixtures::channel::propose(); + let name = fixtures::conversation::propose(); // Call the endpoint (first time) @@ -218,11 +218,11 @@ async fn name_reusable_after_expiry() { Json(request), ) .await - .expect("new channel in an empty app"); + .expect("new conversation in an empty app"); - // Delete the channel + // Expire the conversation - app.channels() + app.conversations() .expire(&fixtures::now()) .await .expect("expiry always succeeds"); @@ -233,7 +233,7 @@ async fn name_reusable_after_expiry() { let super::Response(response) = super::handler(State(app.clone()), creator, fixtures::now(), Json(request)) .await - .expect("creation succeeds after original channel expired"); + .expect("creation succeeds after original conversation expired"); // Verify the structure of the response @@ -241,10 +241,10 @@ async fn name_reusable_after_expiry() { // Verify the semantics - let channel = app - .channels() + let conversation = app + .conversations() .get(&response.id) .await - .expect("the newly-created channel exists"); - assert_eq!(response, channel); + .expect("the newly-created conversation exists"); + assert_eq!(response, conversation); } diff --git a/src/channel/handlers/delete/mod.rs b/src/conversation/handlers/delete/mod.rs index b986bec..272165a 100644 --- a/src/channel/handlers/delete/mod.rs +++ b/src/conversation/handlers/delete/mod.rs @@ -6,8 +6,8 @@ use axum::{ use crate::{ app::App, - channel::{self, app, handlers::PathInfo}, clock::RequestedAt, + conversation::{self, app, handlers::PathInfo}, error::{Internal, NotFound}, token::extract::Identity, }; @@ -17,18 +17,20 @@ mod test; pub async fn handler( State(app): State<App>, - Path(channel): Path<PathInfo>, + Path(conversation): Path<PathInfo>, RequestedAt(deleted_at): RequestedAt, _: Identity, ) -> Result<Response, Error> { - app.channels().delete(&channel, &deleted_at).await?; + app.conversations() + .delete(&conversation, &deleted_at) + .await?; - Ok(Response { id: channel }) + Ok(Response { id: conversation }) } #[derive(Debug, serde::Serialize)] pub struct Response { - pub id: channel::Id, + pub id: conversation::Id, } impl IntoResponse for Response { diff --git a/src/channel/handlers/delete/test.rs b/src/conversation/handlers/delete/test.rs index 99c19db..2718d3b 100644 --- a/src/channel/handlers/delete/test.rs +++ b/src/conversation/handlers/delete/test.rs @@ -1,30 +1,30 @@ use axum::extract::{Path, State}; use itertools::Itertools; -use crate::{channel::app, test::fixtures}; +use crate::{conversation::app, test::fixtures}; #[tokio::test] -pub async fn valid_channel() { +pub async fn valid_conversation() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let response = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), fixtures::now(), deleter, ) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); // Verify the response - assert_eq!(channel.id, response.id); + assert_eq!(conversation.id, response.id); // Verify the semantics @@ -32,17 +32,17 @@ pub async fn valid_channel() { let created = snapshot .events .into_iter() - .filter_map(fixtures::event::channel) - .filter_map(fixtures::event::channel::created) + .filter_map(fixtures::event::conversation) + .filter_map(fixtures::event::conversation::created) .exactly_one() - .expect("only one channel has been created"); - // We don't expect `channel` to match the event exactly, as the name will have been - // tombstoned and the channel given a `deleted_at` date. - assert_eq!(channel.id, created.channel.id); + .expect("only one conversation has been created"); + // We don't expect `conversation` to match the event exactly, as the name will have + // been tombstoned and the conversation given a `deleted_at` date. + assert_eq!(conversation.id, created.conversation.id); } #[tokio::test] -pub async fn invalid_channel_id() { +pub async fn invalid_conversation_id() { // Set up the environment let app = fixtures::scratch_app().await; @@ -50,135 +50,135 @@ pub async fn invalid_channel_id() { // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::fictitious(); + let conversation = fixtures::conversation::fictitious(); let super::Error(error) = super::handler( State(app.clone()), - Path(channel.clone()), + Path(conversation.clone()), fixtures::now(), deleter, ) .await - .expect_err("deleting a nonexistent channel fails"); + .expect_err("deleting a nonexistent conversation fails"); // Verify the response - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation)); } #[tokio::test] -pub async fn channel_deleted() { +pub async fn conversation_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - app.channels() - .delete(&channel.id, &fixtures::now()) + app.conversations() + .delete(&conversation.id, &fixtures::now()) .await - .expect("deleting a recently-sent channel succeeds"); + .expect("deleting a recently-created conversation succeeds"); // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), fixtures::now(), deleter, ) .await - .expect_err("deleting a deleted channel fails"); + .expect_err("deleting a deleted conversation fails"); // Verify the response - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); } #[tokio::test] -pub async fn channel_expired() { +pub async fn conversation_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - app.channels() + app.conversations() .expire(&fixtures::now()) .await - .expect("expiring channels always succeeds"); + .expect("expiring conversations always succeeds"); // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), fixtures::now(), deleter, ) .await - .expect_err("deleting an expired channel fails"); + .expect_err("deleting an expired conversation fails"); // Verify the response - assert!(matches!(error, app::DeleteError::Deleted(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::Deleted(id) if id == conversation.id)); } #[tokio::test] -pub async fn channel_purged() { +pub async fn conversation_purged() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; - app.channels() + app.conversations() .expire(&fixtures::old()) .await - .expect("expiring channels always succeeds"); + .expect("expiring conversations always succeeds"); - app.channels() + app.conversations() .purge(&fixtures::now()) .await - .expect("purging channels always succeeds"); + .expect("purging conversations always succeeds"); // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), fixtures::now(), deleter, ) .await - .expect_err("deleting a purged channel fails"); + .expect_err("deleting a purged conversation fails"); // Verify the response - assert!(matches!(error, app::DeleteError::NotFound(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::NotFound(id) if id == conversation.id)); } #[tokio::test] -pub async fn channel_not_empty() { +pub async fn conversation_not_empty() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; // Send the request let deleter = fixtures::identity::create(&app, &fixtures::now()).await; let super::Error(error) = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), fixtures::now(), deleter, ) .await - .expect_err("deleting a channel with messages fails"); + .expect_err("deleting a conversation with messages fails"); // Verify the response - assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == channel.id)); + assert!(matches!(error, app::DeleteError::NotEmpty(id) if id == conversation.id)); } diff --git a/src/channel/handlers/mod.rs b/src/conversation/handlers/mod.rs index f2ffd0d..2fe727c 100644 --- a/src/channel/handlers/mod.rs +++ b/src/conversation/handlers/mod.rs @@ -6,4 +6,4 @@ pub use create::handler as create; pub use delete::handler as delete; pub use send::handler as send; -type PathInfo = crate::channel::Id; +type PathInfo = crate::conversation::Id; diff --git a/src/channel/handlers/send/mod.rs b/src/conversation/handlers/send/mod.rs index bde39e5..9ec020a 100644 --- a/src/channel/handlers/send/mod.rs +++ b/src/conversation/handlers/send/mod.rs @@ -4,7 +4,7 @@ use axum::{ response::{self, IntoResponse}, }; -use crate::channel::handlers::PathInfo; +use crate::conversation::handlers::PathInfo; use crate::{ app::App, clock::RequestedAt, @@ -18,14 +18,14 @@ mod test; pub async fn handler( State(app): State<App>, - Path(channel): Path<PathInfo>, + Path(conversation): Path<PathInfo>, RequestedAt(sent_at): RequestedAt, identity: Identity, Json(request): Json<Request>, ) -> Result<Response, Error> { let message = app .messages() - .send(&channel, &identity.user, &sent_at, &request.body) + .send(&conversation, &identity.user, &sent_at, &request.body) .await?; Ok(Response(message)) @@ -54,7 +54,7 @@ impl IntoResponse for Error { fn into_response(self) -> response::Response { let Self(error) = self; match error { - SendError::ChannelNotFound(_) | SendError::ChannelDeleted(_) => { + SendError::ConversationNotFound(_) | SendError::ConversationDeleted(_) => { NotFound(error).into_response() } SendError::Name(_) | SendError::Database(_) => Internal::from(error).into_response(), diff --git a/src/channel/handlers/send/test.rs b/src/conversation/handlers/send/test.rs index 70d45eb..bd32510 100644 --- a/src/channel/handlers/send/test.rs +++ b/src/conversation/handlers/send/test.rs @@ -2,7 +2,7 @@ use axum::extract::{Json, Path, State}; use futures::stream::{self, StreamExt as _}; use crate::{ - channel, + conversation, event::Sequenced, message::app::SendError, test::fixtures::{self, future::Expect as _}, @@ -14,7 +14,7 @@ async fn messages_in_order() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint (twice) @@ -29,13 +29,13 @@ async fn messages_in_order() { let _ = super::handler( State(app.clone()), - Path(channel.id.clone()), + Path(conversation.id.clone()), sent_at.clone(), sender.clone(), Json(request), ) .await - .expect("sending to a valid channel succeeds"); + .expect("sending to a valid conversation succeeds"); } // Verify the semantics @@ -44,7 +44,7 @@ async fn messages_in_order() { .events() .subscribe(resume_point) .await - .expect("subscribing to a valid channel succeeds") + .expect("subscribing always succeeds") .filter_map(fixtures::event::stream::message) .filter_map(fixtures::event::stream::message::sent) .zip(stream::iter(requests)); @@ -61,7 +61,7 @@ async fn messages_in_order() { } #[tokio::test] -async fn nonexistent_channel() { +async fn nonexistent_conversation() { // Set up the environment let app = fixtures::scratch_app().await; @@ -70,40 +70,40 @@ async fn nonexistent_channel() { // Call the endpoint let sent_at = fixtures::now(); - let channel = channel::Id::generate(); + let conversation = conversation::Id::generate(); let request = super::Request { body: fixtures::message::propose(), }; let super::Error(error) = super::handler( State(app), - Path(channel.clone()), + Path(conversation.clone()), sent_at, sender, Json(request), ) .await - .expect_err("sending to a nonexistent channel fails"); + .expect_err("sending to a nonexistent conversation fails"); // Verify the structure of the response assert!(matches!( error, - SendError::ChannelNotFound(error_channel) if channel == error_channel + SendError::ConversationNotFound(error_conversation) if conversation == error_conversation )); } #[tokio::test] -async fn deleted_channel() { +async fn deleted_conversation() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; - app.channels() - .delete(&channel.id, &fixtures::now()) + app.conversations() + .delete(&conversation.id, &fixtures::now()) .await - .expect("deleting a new channel succeeds"); + .expect("deleting a new conversation succeeds"); // Call the endpoint @@ -113,18 +113,18 @@ async fn deleted_channel() { }; let super::Error(error) = super::handler( State(app), - Path(channel.id.clone()), + Path(conversation.id.clone()), sent_at, sender, Json(request), ) .await - .expect_err("sending to a deleted channel fails"); + .expect_err("sending to a deleted conversation fails"); // Verify the structure of the response assert!(matches!( error, - SendError::ChannelDeleted(error_channel) if channel.id == error_channel + SendError::ConversationDeleted(error_conversation) if conversation.id == error_conversation )); } diff --git a/src/channel/history.rs b/src/conversation/history.rs index 85da5a5..601614c 100644 --- a/src/channel/history.rs +++ b/src/conversation/history.rs @@ -1,33 +1,33 @@ use itertools::Itertools as _; use super::{ - Channel, Id, + Conversation, Id, event::{Created, Deleted, Event}, }; use crate::event::{Instant, Sequence}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { - pub channel: Channel, + pub conversation: Conversation, pub deleted: Option<Instant>, } // State interface impl History { pub fn id(&self) -> &Id { - &self.channel.id + &self.conversation.id } - // Snapshot of this channel as it was when created. (Note to the future: it's - // okay if this returns a redacted or modified version of the channel. If we - // implement renames by redacting the original name, then this should return the - // renamed channel, not the original, even if that's not how it was "as - // created.") - pub fn as_created(&self) -> Channel { - self.channel.clone() + // Snapshot of this conversation as it was when created. (Note to the future: + // it's okay if this returns a redacted or modified version of the conversation. + // If we implement renames by redacting the original name, then this should + // return the renamed conversation, not the original, even if that's not how + // it was "as created.") + pub fn as_created(&self) -> Conversation { + self.conversation.clone() } - pub fn as_of<S>(&self, sequence: S) -> Option<Channel> + pub fn as_of<S>(&self, sequence: S) -> Option<Conversation> where S: Into<Sequence>, { @@ -36,8 +36,8 @@ impl History { .collect() } - // Snapshot of this channel as of all events recorded in this history. - pub fn as_snapshot(&self) -> Option<Channel> { + // Snapshot of this conversation as of all events recorded in this history. + pub fn as_snapshot(&self) -> Option<Conversation> { self.events().collect() } } @@ -52,7 +52,7 @@ impl History { fn created(&self) -> Event { Created { - channel: self.channel.clone(), + conversation: self.conversation.clone(), } .into() } @@ -61,7 +61,7 @@ impl History { self.deleted.map(|instant| { Deleted { instant, - id: self.channel.id.clone(), + id: self.conversation.id.clone(), } .into() }) diff --git a/src/channel/id.rs b/src/conversation/id.rs index 22a2700..5f37a59 100644 --- a/src/channel/id.rs +++ b/src/conversation/id.rs @@ -2,7 +2,7 @@ use std::fmt; use crate::id::Id as BaseId; -// Stable identifier for a [Channel]. Prefixed with `C`. +// Stable identifier for a [Conversation]. Prefixed with `C`. #[derive( Clone, Debug, diff --git a/src/channel/mod.rs b/src/conversation/mod.rs index bbaf33e..3dfa187 100644 --- a/src/channel/mod.rs +++ b/src/conversation/mod.rs @@ -7,4 +7,4 @@ pub mod repo; mod snapshot; mod validate; -pub use self::{event::Event, history::History, id::Id, snapshot::Channel}; +pub use self::{event::Event, history::History, id::Id, snapshot::Conversation}; diff --git a/src/channel/repo.rs b/src/conversation/repo.rs index fd2173a..82b5f01 100644 --- a/src/channel/repo.rs +++ b/src/conversation/repo.rs @@ -2,26 +2,26 @@ use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use crate::{ - channel::{Channel, History, Id}, clock::DateTime, + conversation::{Conversation, History, Id}, db::NotFound, event::{Instant, Sequence}, name::{self, Name}, }; pub trait Provider { - fn channels(&mut self) -> Channels; + fn conversations(&mut self) -> Conversations; } impl Provider for Transaction<'_, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) + fn conversations(&mut self) -> Conversations { + Conversations(self) } } -pub struct Channels<'t>(&'t mut SqliteConnection); +pub struct Conversations<'t>(&'t mut SqliteConnection); -impl Channels<'_> { +impl Conversations<'_> { pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> { let id = Id::generate(); let name = name.clone(); @@ -54,8 +54,8 @@ impl Channels<'_> { .execute(&mut *self.0) .await?; - let channel = History { - channel: Channel { + let conversation = History { + conversation: Conversation { created, id, name: name.clone(), @@ -64,11 +64,11 @@ impl Channels<'_> { deleted: None, }; - Ok(channel) + Ok(conversation) } - pub async fn by_id(&mut self, channel: &Id) -> Result<History, LoadError> { - let channel = sqlx::query!( + pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> { + let conversation = sqlx::query!( r#" select id as "id: Id", @@ -85,11 +85,11 @@ impl Channels<'_> { using (id) where id = $1 "#, - channel, + conversation, ) .map(|row| { Ok::<_, name::Error>(History { - channel: Channel { + conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), @@ -101,11 +101,11 @@ impl Channels<'_> { .fetch_one(&mut *self.0) .await??; - Ok(channel) + Ok(conversation) } pub async fn all(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( + let conversations = sqlx::query!( r#" select id as "id: Id", @@ -127,7 +127,7 @@ impl Channels<'_> { ) .map(|row| { Ok::<_, name::Error>(History { - channel: Channel { + conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), @@ -141,11 +141,11 @@ impl Channels<'_> { .try_collect() .await?; - Ok(channels) + Ok(conversations) } pub async fn replay(&mut self, resume_at: Sequence) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( + let conversations = sqlx::query!( r#" select id as "id: Id", @@ -166,7 +166,7 @@ impl Channels<'_> { ) .map(|row| { Ok::<_, name::Error>(History { - channel: Channel { + conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), @@ -180,15 +180,15 @@ impl Channels<'_> { .try_collect() .await?; - Ok(channels) + Ok(conversations) } pub async fn delete( &mut self, - channel: &History, + conversation: &History, deleted: &Instant, ) -> Result<History, LoadError> { - let id = channel.id(); + let id = conversation.id(); sqlx::query!( r#" update conversation @@ -216,12 +216,8 @@ impl Channels<'_> { // Small social responsibility hack here: when a conversation is deleted, its // name is retconned to have been the empty string. Someone reading the event - // stream afterwards, or looking at channels via the API, cannot retrieve the - // "deleted" channel's information by ignoring the deletion event. - // - // This also avoids the need for a separate name reservation table to ensure - // that live channels have unique names, since the `channel` table's name field - // is unique over non-null values. + // stream afterwards, or looking at conversations via the API, cannot retrieve + // the "deleted" conversation's information by ignoring the deletion event. sqlx::query!( r#" delete from conversation_name @@ -232,13 +228,13 @@ impl Channels<'_> { .execute(&mut *self.0) .await?; - let channel = self.by_id(id).await?; + let conversation = self.by_id(id).await?; - Ok(channel) + Ok(conversation) } pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> { - let channels = sqlx::query_scalar!( + let conversations = sqlx::query_scalar!( r#" with has_messages as ( select conversation @@ -255,14 +251,14 @@ impl Channels<'_> { .fetch_all(&mut *self.0) .await?; - for channel in channels { + for conversation in conversations { // Wanted: a way to batch these up into one query. sqlx::query!( r#" delete from conversation where id = $1 "#, - channel, + conversation, ) .execute(&mut *self.0) .await?; @@ -272,7 +268,7 @@ impl Channels<'_> { } pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<History>, LoadError> { - let channels = sqlx::query!( + let conversations = sqlx::query!( r#" select conversation.id as "id: Id", @@ -297,7 +293,7 @@ impl Channels<'_> { ) .map(|row| { Ok::<_, name::Error>(History { - channel: Channel { + conversation: Conversation { created: Instant::new(row.created_at, row.created_sequence), id: row.id, name: Name::optional(row.display_name, row.canonical_name)?.unwrap_or_default(), @@ -311,7 +307,7 @@ impl Channels<'_> { .try_collect() .await?; - Ok(channels) + Ok(conversations) } } diff --git a/src/channel/snapshot.rs b/src/conversation/snapshot.rs index 96801b8..da9eaae 100644 --- a/src/channel/snapshot.rs +++ b/src/conversation/snapshot.rs @@ -5,7 +5,7 @@ use super::{ use crate::{clock::DateTime, event::Instant, name::Name}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] -pub struct Channel { +pub struct Conversation { #[serde(flatten)] pub created: Instant, pub id: Id, @@ -14,30 +14,30 @@ pub struct Channel { pub deleted_at: Option<DateTime>, } -impl Channel { +impl Conversation { fn apply(state: Option<Self>, event: Event) -> Option<Self> { match (state, event) { (None, Event::Created(event)) => Some(event.into()), - (Some(channel), Event::Deleted(event)) if channel.id == event.id => None, - (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), + (Some(conversation), Event::Deleted(event)) if conversation.id == event.id => None, + (state, event) => panic!("invalid conversation event {event:#?} for state {state:#?}"), } } } -impl FromIterator<Event> for Option<Channel> { +impl FromIterator<Event> for Option<Conversation> { fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { - events.into_iter().fold(None, Channel::apply) + events.into_iter().fold(None, Conversation::apply) } } -impl From<&Created> for Channel { +impl From<&Created> for Conversation { fn from(event: &Created) -> Self { - event.channel.clone() + event.conversation.clone() } } -impl From<Created> for Channel { +impl From<Created> for Conversation { fn from(event: Created) -> Self { - event.channel + event.conversation } } diff --git a/src/channel/validate.rs b/src/conversation/validate.rs index 7894e0c..7894e0c 100644 --- a/src/channel/validate.rs +++ b/src/conversation/validate.rs diff --git a/src/event/app.rs b/src/event/app.rs index 45a9099..7359bfb 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -7,7 +7,7 @@ use sqlx::sqlite::SqlitePool; use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster}; use crate::{ - channel::{self, repo::Provider as _}, + conversation::{self, repo::Provider as _}, message::{self, repo::Provider as _}, name, user::{self, repo::Provider as _}, @@ -41,10 +41,10 @@ impl<'a> Events<'a> { .filter(Sequence::after(resume_at)) .map(Event::from); - let channels = tx.channels().replay(resume_at).await?; - let channel_events = channels + let conversations = tx.conversations().replay(resume_at).await?; + let conversation_events = conversations .iter() - .map(channel::History::events) + .map(conversation::History::events) .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -58,7 +58,7 @@ impl<'a> Events<'a> { .map(Event::from); let replay_events = user_events - .merge_by(channel_events, Sequence::merge) + .merge_by(conversation_events, Sequence::merge) .merge_by(message_events, Sequence::merge) .collect::<Vec<_>>(); let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence); @@ -98,9 +98,9 @@ impl From<user::repo::LoadError> for Error { } } -impl From<channel::repo::LoadError> for Error { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From<conversation::repo::LoadError> for Error { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/conversation.rs index 2b87ce2..5e08075 100644 --- a/src/event/handlers/stream/test/channel.rs +++ b/src/event/handlers/stream/test/conversation.rs @@ -23,23 +23,23 @@ async fn creating() { .await .expect("subscribe never fails"); - // Create a channel + // Create a conversation - let name = fixtures::channel::propose(); - let channel = app - .channels() + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() .create(&name, &fixtures::now()) .await - .expect("creating a channel succeeds"); + .expect("creating a conversation succeeds"); - // Verify channel created event + // Verify conversation created event events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == channel)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) .next() - .expect_some("channel created event is delivered") + .expect_some("conversation created event is delivered") .await; } @@ -50,14 +50,14 @@ async fn previously_created() { let app = fixtures::scratch_app().await; let resume_point = fixtures::boot::resume_point(&app).await; - // Create a channel + // Create a conversation - let name = fixtures::channel::propose(); - let channel = app - .channels() + let name = fixtures::conversation::propose(); + let conversation = app + .conversations() .create(&name, &fixtures::now()) .await - .expect("creating a channel succeeds"); + .expect("creating a conversation succeeds"); // Subscribe @@ -71,14 +71,14 @@ async fn previously_created() { .await .expect("subscribe never fails"); - // Verify channel created event + // Verify conversation created event let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::created) - .filter(|event| future::ready(event.channel == channel)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::created) + .filter(|event| future::ready(event.conversation == conversation)) .next() - .expect_some("channel created event is delivered") + .expect_some("conversation created event is delivered") .await; } @@ -87,7 +87,7 @@ async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -102,20 +102,20 @@ async fn expiring() { .await .expect("subscribe never fails"); - // Expire channels + // Expire conversations - app.channels() + app.conversations() .expire(&fixtures::now()) .await - .expect("expiring channels always succeeds"); + .expect("expiring conversations always succeeds"); // Check for expiry event let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) .next() - .expect_some("a deleted channel event will be delivered") + .expect_some("a deleted conversation event will be delivered") .await; } @@ -124,15 +124,15 @@ async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; - // Expire channels + // Expire conversations - app.channels() + app.conversations() .expire(&fixtures::now()) .await - .expect("expiring channels always succeeds"); + .expect("expiring conversation always succeeds"); // Subscribe @@ -148,11 +148,11 @@ async fn previously_expired() { // Check for expiry event let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) .next() - .expect_some("a deleted channel event will be delivered") + .expect_some("a deleted conversation event will be delivered") .await; } @@ -161,7 +161,7 @@ async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -176,20 +176,20 @@ async fn deleting() { .await .expect("subscribe never fails"); - // Delete the channel + // Delete the conversation - app.channels() - .delete(&channel.id, &fixtures::now()) + app.conversations() + .delete(&conversation.id, &fixtures::now()) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); // Check for delete event let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) .next() - .expect_some("a deleted channel event will be delivered") + .expect_some("a deleted conversation event will be delivered") .await; } @@ -198,15 +198,15 @@ async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; - // Delete the channel + // Delete the conversation - app.channels() - .delete(&channel.id, &fixtures::now()) + app.conversations() + .delete(&conversation.id, &fixtures::now()) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); // Subscribe @@ -222,11 +222,11 @@ async fn previously_deleted() { // Check for expiry event let _ = events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) .next() - .expect_some("a deleted channel event will be delivered") + .expect_some("a deleted conversation event will be delivered") .await; } @@ -235,20 +235,20 @@ async fn previously_purged() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; - // Delete and purge the channel + // Delete and purge the conversation - app.channels() - .delete(&channel.id, &fixtures::ancient()) + app.conversations() + .delete(&conversation.id, &fixtures::ancient()) .await - .expect("deleting a valid channel succeeds"); + .expect("deleting a valid conversation succeeds"); - app.channels() + app.conversations() .purge(&fixtures::now()) .await - .expect("purging channels always succeeds"); + .expect("purging conversations always succeeds"); // Subscribe @@ -264,10 +264,10 @@ async fn previously_purged() { // Check for expiry event events - .filter_map(fixtures::event::stream::channel) - .filter_map(fixtures::event::stream::channel::deleted) - .filter(|event| future::ready(event.id == channel.id)) + .filter_map(fixtures::event::stream::conversation) + .filter_map(fixtures::event::stream::conversation::deleted) + .filter(|event| future::ready(event.id == conversation.id)) .next() - .expect_wait("deleted channel events not delivered") + .expect_wait("deleted conversation events not delivered") .await; } diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs index 4369996..3fba317 100644 --- a/src/event/handlers/stream/test/message.rs +++ b/src/event/handlers/stream/test/message.rs @@ -12,7 +12,7 @@ async fn sending() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -33,7 +33,7 @@ async fn sending() { let message = app .messages() .send( - &channel.id, + &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), @@ -57,7 +57,7 @@ async fn previously_sent() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Send a message @@ -66,7 +66,7 @@ async fn previously_sent() { let message = app .messages() .send( - &channel.id, + &conversation.id, &sender, &fixtures::now(), &fixtures::message::propose(), @@ -98,27 +98,30 @@ async fn previously_sent() { } #[tokio::test] -async fn sent_in_multiple_channels() { +async fn sent_in_multiple_conversations() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; - let channels = [ - fixtures::channel::create(&app, &fixtures::now()).await, - fixtures::channel::create(&app, &fixtures::now()).await, + let conversations = [ + fixtures::conversation::create(&app, &fixtures::now()).await, + fixtures::conversation::create(&app, &fixtures::now()).await, ]; - let messages = stream::iter(channels) - .then(|channel| { - let app = app.clone(); - let sender = sender.clone(); - let channel = channel.clone(); - async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await } - }) - .collect::<Vec<_>>() - .await; + let messages = + stream::iter(conversations) + .then(|conversation| { + let app = app.clone(); + let sender = sender.clone(); + let conversation = conversation.clone(); + async move { + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await + } + }) + .collect::<Vec<_>>() + .await; // Call the endpoint @@ -152,14 +155,14 @@ async fn sent_sequentially() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; let messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Subscribe @@ -196,9 +199,9 @@ async fn expiring() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -235,9 +238,9 @@ async fn previously_expired() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Expire messages @@ -274,9 +277,9 @@ async fn deleting() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Subscribe @@ -313,9 +316,9 @@ async fn previously_deleted() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Delete the message @@ -352,9 +355,9 @@ async fn previously_purged() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Purge the message diff --git a/src/event/handlers/stream/test/mod.rs b/src/event/handlers/stream/test/mod.rs index df43deb..3bc634f 100644 --- a/src/event/handlers/stream/test/mod.rs +++ b/src/event/handlers/stream/test/mod.rs @@ -1,4 +1,4 @@ -mod channel; +mod conversation; mod invite; mod message; mod resume; diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs index 835d350..a0da692 100644 --- a/src/event/handlers/stream/test/resume.rs +++ b/src/event/handlers/stream/test/resume.rs @@ -14,15 +14,16 @@ async fn resume() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; - let initial_message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let initial_message = + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; let later_messages = vec![ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; // Call the endpoint @@ -75,8 +76,8 @@ async fn resume() { // This test verifies a real bug I hit developing the vector-of-sequences // approach to resuming events. A small omission caused the event IDs in a -// resumed stream to _omit_ channels that were in the original stream until -// those channels also appeared in the resumed stream. +// resumed stream to _omit_ conversations that were in the original stream +// until those conversations also appeared in the resumed stream. // // Clients would see something like // * In the original stream, Cfoo=5,Cbar=8 @@ -84,8 +85,8 @@ async fn resume() { // // Disconnecting and reconnecting a second time, using event IDs from that // initial period of the first resume attempt, would then cause the second -// resume attempt to restart all other channels from the beginning, and not -// from where the first disconnection happened. +// resume attempt to restart all other conversations from the beginning, and +// not from where the first disconnection happened. // // As we have switched to a single global event sequence number, this scenario // can no longer arise, but this test is preserved because the actual behaviour @@ -97,8 +98,8 @@ async fn serial_resume() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel_a = fixtures::channel::create(&app, &fixtures::now()).await; - let channel_b = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation_a = fixtures::conversation::create(&app, &fixtures::now()).await; + let conversation_b = fixtures::conversation::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; // Call the endpoint @@ -107,8 +108,8 @@ async fn serial_resume() { let resume_at = { let initial_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // First subscription @@ -148,11 +149,11 @@ async fn serial_resume() { // Resume after disconnect let resume_at = { let resume_messages = [ - // Note that channel_b does not appear here. The buggy behaviour - // would be masked if channel_b happened to send a new message + // Note that conversation_b does not appear here. The buggy behaviour + // would be masked if conversation_b happened to send a new message // into the resumed event stream. - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, ]; // Second subscription @@ -190,12 +191,12 @@ async fn serial_resume() { // Resume after disconnect a second time { - // At this point, we can send on either channel and demonstrate the - // problem. The resume point should before both of these messages, but - // after _all_ prior messages. + // At this point, we can send on either conversation and demonstrate + // the problem. The resume point should before both of these messages, + // but after _all_ prior messages. let final_messages = [ - fixtures::message::send(&app, &channel_a, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel_b, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_a, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation_b, &sender, &fixtures::now()).await, ]; // Third subscription diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs index e32b489..5af07a0 100644 --- a/src/event/handlers/stream/test/token.rs +++ b/src/event/handlers/stream/test/token.rs @@ -9,7 +9,7 @@ async fn terminates_on_token_expiry() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -37,9 +37,9 @@ async fn terminates_on_token_expiry() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events @@ -56,7 +56,7 @@ async fn terminates_on_logout() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -83,9 +83,9 @@ async fn terminates_on_logout() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events @@ -102,7 +102,7 @@ async fn terminates_on_password_change() { // Set up the environment let app = fixtures::scratch_app().await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; let sender = fixtures::user::create(&app, &fixtures::now()).await; let resume_point = fixtures::boot::resume_point(&app).await; @@ -133,9 +133,9 @@ async fn terminates_on_password_change() { // These should not be delivered. let messages = [ - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, - fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, + fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await, ]; events diff --git a/src/event/mod.rs b/src/event/mod.rs index 6657243..801bcb9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; use axum::response::sse::{self, KeepAlive}; -use crate::{channel, message, user}; +use crate::{conversation, message, user}; pub mod app; mod broadcaster; @@ -20,7 +20,7 @@ pub use self::{ #[serde(tag = "type", rename_all = "snake_case")] pub enum Event { User(user::Event), - Channel(channel::Event), + Channel(conversation::Event), Message(message::Event), } @@ -50,8 +50,8 @@ impl From<user::Event> for Event { } } -impl From<channel::Event> for Event { - fn from(event: channel::Event) -> Self { +impl From<conversation::Event> for Event { + fn from(event: conversation::Event) -> Self { Self::Channel(event) } } diff --git a/src/expire.rs b/src/expire.rs index 1427a8d..4177a53 100644 --- a/src/expire.rs +++ b/src/expire.rs @@ -6,7 +6,7 @@ use axum::{ use crate::{app::App, clock::RequestedAt, error::Internal}; -// Expires messages and channels before each request. +// Expires messages and conversations before each request. pub async fn middleware( State(app): State<App>, RequestedAt(expired_at): RequestedAt, @@ -17,7 +17,7 @@ pub async fn middleware( app.invites().expire(&expired_at).await?; app.messages().expire(&expired_at).await?; app.messages().purge(&expired_at).await?; - app.channels().expire(&expired_at).await?; - app.channels().purge(&expired_at).await?; + app.conversations().expire(&expired_at).await?; + app.conversations().purge(&expired_at).await?; Ok(next.run(req).await) } @@ -5,9 +5,9 @@ mod app; mod boot; mod broadcast; -mod channel; pub mod cli; mod clock; +mod conversation; mod db; mod error; mod event; diff --git a/src/message/app.rs b/src/message/app.rs index 9792c8f..bdc2164 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -4,8 +4,8 @@ use sqlx::sqlite::SqlitePool; use super::{Body, Id, Message, repo::Provider as _}; use crate::{ - channel::{self, repo::Provider as _}, clock::DateTime, + conversation::{self, repo::Provider as _}, db::NotFound as _, event::{Broadcaster, Event, Sequence, repo::Provider as _}, name, @@ -24,23 +24,29 @@ impl<'a> Messages<'a> { pub async fn send( &self, - channel: &channel::Id, + conversation: &conversation::Id, sender: &User, sent_at: &DateTime, body: &Body, ) -> Result<Message, SendError> { - let to_not_found = || SendError::ChannelNotFound(channel.clone()); - let to_deleted = || SendError::ChannelDeleted(channel.clone()); + let to_not_found = || SendError::ConversationNotFound(conversation.clone()); + let to_deleted = || SendError::ConversationDeleted(conversation.clone()); let mut tx = self.db.begin().await?; - let channel = tx.channels().by_id(channel).await.not_found(to_not_found)?; + let conversation = tx + .conversations() + .by_id(conversation) + .await + .not_found(to_not_found)?; // Ordering: don't bother allocating a sequence number before we know the channel might // exist. let sent = tx.sequence().next(sent_at).await?; - let channel = channel.as_of(sent).ok_or_else(to_deleted)?; - - let message = tx.messages().create(&channel, sender, &sent, body).await?; + let conversation = conversation.as_of(sent).ok_or_else(to_deleted)?; + let message = tx + .messages() + .create(&conversation, sender, &sent, body) + .await?; tx.commit().await?; self.events @@ -128,19 +134,19 @@ impl<'a> Messages<'a> { #[derive(Debug, thiserror::Error)] pub enum SendError { - #[error("channel {0} not found")] - ChannelNotFound(channel::Id), - #[error("channel {0} deleted")] - ChannelDeleted(channel::Id), + #[error("conversation {0} not found")] + ConversationNotFound(conversation::Id), + #[error("conversation {0} deleted")] + ConversationDeleted(conversation::Id), #[error(transparent)] Database(#[from] sqlx::Error), #[error(transparent)] Name(#[from] name::Error), } -impl From<channel::repo::LoadError> for SendError { - fn from(error: channel::repo::LoadError) -> Self { - use channel::repo::LoadError; +impl From<conversation::repo::LoadError> for SendError { + fn from(error: conversation::repo::LoadError) -> Self { + use conversation::repo::LoadError; match error { LoadError::Database(error) => error.into(), LoadError::Name(error) => error.into(), diff --git a/src/message/handlers/delete/test.rs b/src/message/handlers/delete/test.rs index f567eb7..371c7bf 100644 --- a/src/message/handlers/delete/test.rs +++ b/src/message/handlers/delete/test.rs @@ -9,8 +9,9 @@ pub async fn delete_message() { let app = fixtures::scratch_app().await; let sender = fixtures::identity::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender.user, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = + fixtures::message::send(&app, &conversation, &sender.user, &fixtures::now()).await; // Send the request @@ -70,8 +71,8 @@ pub async fn delete_deleted() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; app.messages() .delete(&sender, &message.id, &fixtures::now()) @@ -101,8 +102,8 @@ pub async fn delete_expired() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::now()) @@ -132,8 +133,8 @@ pub async fn delete_purged() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::ancient()).await; - let channel = fixtures::channel::create(&app, &fixtures::ancient()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::ancient()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::ancient()).await; app.messages() .expire(&fixtures::old()) @@ -168,8 +169,8 @@ pub async fn delete_not_sender() { let app = fixtures::scratch_app().await; let sender = fixtures::user::create(&app, &fixtures::now()).await; - let channel = fixtures::channel::create(&app, &fixtures::now()).await; - let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await; + let conversation = fixtures::conversation::create(&app, &fixtures::now()).await; + let message = fixtures::message::send(&app, &conversation, &sender, &fixtures::now()).await; // Send the request diff --git a/src/message/repo.rs b/src/message/repo.rs index 159ce8e..68f6e4a 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -2,8 +2,8 @@ use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use super::{Body, History, Id, snapshot::Message}; use crate::{ - channel::{self, Channel}, clock::DateTime, + conversation::{self, Conversation}, event::{Instant, Sequence}, user::{self, User}, }; @@ -23,7 +23,7 @@ pub struct Messages<'t>(&'t mut SqliteConnection); impl Messages<'_> { pub async fn create( &mut self, - channel: &Channel, + conversation: &Conversation, sender: &User, sent: &Instant, body: &Body, @@ -37,14 +37,14 @@ impl Messages<'_> { values ($1, $2, $3, $4, $5, $6, $7) returning id as "id: Id", - conversation as "conversation: channel::Id", + conversation as "conversation: conversation::Id", sender as "sender: user::Id", sent_at as "sent_at: DateTime", sent_sequence as "sent_sequence: Sequence", body as "body: Body" "#, id, - channel.id, + conversation.id, sender.id, sent.at, sent.sequence, @@ -68,12 +68,15 @@ impl Messages<'_> { Ok(message) } - pub async fn live(&mut self, channel: &channel::History) -> Result<Vec<History>, sqlx::Error> { - let channel_id = channel.id(); + pub async fn live( + &mut self, + conversation: &conversation::History, + ) -> Result<Vec<History>, sqlx::Error> { + let conversation_id = conversation.id(); let messages = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -87,7 +90,7 @@ impl Messages<'_> { where message.conversation = $1 and deleted.id is null "#, - channel_id, + conversation_id, ) .map(|row| History { message: Message { @@ -110,7 +113,7 @@ impl Messages<'_> { let messages = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.id as "id: Id", message.body as "body: Body", @@ -147,7 +150,7 @@ impl Messages<'_> { let message = sqlx::query!( r#" select - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", id as "id: Id", message.body as "body: Body", @@ -200,7 +203,7 @@ impl Messages<'_> { // Small social responsibility hack here: when a message is deleted, its body is // retconned to have been the empty string. Someone reading the event stream - // afterwards, or looking at messages in the channel, cannot retrieve the + // afterwards, or looking at messages in the conversation, cannot retrieve the // "deleted" message by ignoring the deletion event. sqlx::query!( r#" @@ -252,7 +255,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", @@ -289,7 +292,7 @@ impl Messages<'_> { r#" select id as "id: Id", - message.conversation as "conversation: channel::Id", + message.conversation as "conversation: conversation::Id", message.sender as "sender: user::Id", message.sent_at as "sent_at: DateTime", message.sent_sequence as "sent_sequence: Sequence", diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index ac067f7..0e6e9ae 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,13 +2,13 @@ use super::{ Body, Id, event::{Event, Sent}, }; -use crate::{channel, clock::DateTime, event::Instant, user}; +use crate::{clock::DateTime, conversation, event::Instant, user}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { #[serde(flatten)] pub sent: Instant, - pub channel: channel::Id, + pub channel: conversation::Id, pub sender: user::Id, pub id: Id, pub body: Body, diff --git a/src/routes.rs b/src/routes.rs index 1e66582..ca4c60c 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -4,7 +4,7 @@ use axum::{ routing::{delete, get, post}, }; -use crate::{app::App, boot, channel, event, expire, invite, message, setup, ui, user}; +use crate::{app::App, boot, conversation, event, expire, invite, message, setup, ui, user}; pub fn routes(app: &App) -> Router<App> { // UI routes that can be accessed before the administrator completes setup. @@ -15,7 +15,7 @@ pub fn routes(app: &App) -> Router<App> { // UI routes that require the administrator to complete setup first. let ui_setup_required = Router::new() .route("/", get(ui::handlers::index)) - .route("/ch/{channel}", get(ui::handlers::channel)) + .route("/ch/{channel}", get(ui::handlers::conversation)) .route("/invite/{invite}", get(ui::handlers::invite)) .route("/login", get(ui::handlers::login)) .route("/me", get(ui::handlers::me)) @@ -29,9 +29,15 @@ pub fn routes(app: &App) -> Router<App> { .route("/api/auth/login", post(user::handlers::login)) .route("/api/auth/logout", post(user::handlers::logout)) .route("/api/boot", get(boot::handlers::boot)) - .route("/api/channels", post(channel::handlers::create)) - .route("/api/channels/{channel}", post(channel::handlers::send)) - .route("/api/channels/{channel}", delete(channel::handlers::delete)) + .route("/api/channels", post(conversation::handlers::create)) + .route( + "/api/channels/{channel}", + post(conversation::handlers::send), + ) + .route( + "/api/channels/{channel}", + delete(conversation::handlers::delete), + ) .route("/api/events", get(event::handlers::stream)) .route("/api/invite", post(invite::handlers::issue)) .route("/api/invite/{invite}", get(invite::handlers::get)) diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/conversation.rs index 98048f2..fb2f58d 100644 --- a/src/test/fixtures/channel.rs +++ b/src/test/fixtures/conversation.rs @@ -7,17 +7,17 @@ use rand; use crate::{ app::App, - channel::{self, Channel}, clock::RequestedAt, + conversation::{self, Conversation}, name::Name, }; -pub async fn create(app: &App, created_at: &RequestedAt) -> Channel { +pub async fn create(app: &App, created_at: &RequestedAt) -> Conversation { let name = propose(); - app.channels() + app.conversations() .create(&name, created_at) .await - .expect("should always succeed if the channel is actually new") + .expect("should always succeed if the conversation is actually new") } pub fn propose() -> Name { @@ -33,6 +33,6 @@ faker_impl_from_templates! { NameTemplate; "{} {}", CityName, FullName; } -pub fn fictitious() -> channel::Id { - channel::Id::generate() +pub fn fictitious() -> conversation::Id { + conversation::Id::generate() } diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs index 691cdeb..69c79d8 100644 --- a/src/test/fixtures/event/mod.rs +++ b/src/test/fixtures/event/mod.rs @@ -2,7 +2,7 @@ use crate::event::Event; pub mod stream; -pub fn channel(event: Event) -> Option<crate::channel::Event> { +pub fn conversation(event: Event) -> Option<crate::conversation::Event> { match event { Event::Channel(channel) => Some(channel), _ => None, @@ -23,8 +23,8 @@ pub fn user(event: Event) -> Option<crate::user::Event> { } } -pub mod channel { - use crate::channel::{Event, event}; +pub mod conversation { + use crate::conversation::{Event, event}; pub fn created(event: Event) -> Option<event::Created> { match event { diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs index 6c2a1bf..5b3621d 100644 --- a/src/test/fixtures/event/stream.rs +++ b/src/test/fixtures/event/stream.rs @@ -2,8 +2,8 @@ use std::future::{self, Ready}; use crate::{event::Event, test::fixtures::event}; -pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> { - future::ready(event::channel(event)) +pub fn conversation(event: Event) -> Ready<Option<crate::conversation::Event>> { + future::ready(event::conversation(event)) } pub fn message(event: Event) -> Ready<Option<crate::message::Event>> { @@ -14,20 +14,20 @@ pub fn user(event: Event) -> Ready<Option<crate::user::Event>> { future::ready(event::user(event)) } -pub mod channel { +pub mod conversation { use std::future::{self, Ready}; use crate::{ - channel::{Event, event}, - test::fixtures::event::channel, + conversation::{Event, event}, + test::fixtures::event::conversation, }; pub fn created(event: Event) -> Ready<Option<event::Created>> { - future::ready(channel::created(event)) + future::ready(conversation::created(event)) } pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> { - future::ready(channel::deleted(event)) + future::ready(conversation::deleted(event)) } } diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs index 2254915..03f8072 100644 --- a/src/test/fixtures/message.rs +++ b/src/test/fixtures/message.rs @@ -2,19 +2,24 @@ use faker_rand::lorem::Paragraphs; use crate::{ app::App, - channel::Channel, clock::RequestedAt, + conversation::Conversation, message::{self, Body, Message}, user::User, }; -pub async fn send(app: &App, channel: &Channel, sender: &User, sent_at: &RequestedAt) -> Message { +pub async fn send( + app: &App, + conversation: &Conversation, + sender: &User, + sent_at: &RequestedAt, +) -> Message { let body = propose(); app.messages() - .send(&channel.id, sender, sent_at, &body) + .send(&conversation.id, sender, sent_at, &body) .await - .expect("should succeed if the channel exists") + .expect("should succeed if the conversation exists") } pub fn propose() -> Body { diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index 418bdb5..87d3fa1 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -3,7 +3,7 @@ use chrono::{TimeDelta, Utc}; use crate::{app::App, clock::RequestedAt, db}; pub mod boot; -pub mod channel; +pub mod conversation; pub mod cookie; pub mod event; pub mod future; diff --git a/src/ui/handlers/channel.rs b/src/ui/handlers/conversation.rs index d3199dd..f1bb319 100644 --- a/src/ui/handlers/channel.rs +++ b/src/ui/handlers/conversation.rs @@ -5,7 +5,7 @@ use axum::{ use crate::{ app::App, - channel::{self, app}, + conversation::{self, app}, error::Internal, token::extract::Identity, ui::{ @@ -17,17 +17,20 @@ use crate::{ pub async fn handler( State(app): State<App>, identity: Option<Identity>, - Path(channel): Path<channel::Id>, + Path(conversation): Path<conversation::Id>, ) -> Result<Asset, Error> { let _ = identity.ok_or(Error::NotLoggedIn)?; - app.channels().get(&channel).await.map_err(Error::from)?; + app.conversations() + .get(&conversation) + .await + .map_err(Error::from)?; Assets::index().map_err(Error::Internal) } #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("channel not found")] + #[error("conversation not found")] NotFound, #[error("not logged in")] NotLoggedIn, diff --git a/src/ui/handlers/mod.rs b/src/ui/handlers/mod.rs index 5bfd0d6..ed0c14e 100644 --- a/src/ui/handlers/mod.rs +++ b/src/ui/handlers/mod.rs @@ -1,5 +1,5 @@ mod asset; -mod channel; +mod conversation; mod index; mod invite; mod login; @@ -7,7 +7,7 @@ mod me; mod setup; pub use asset::handler as asset; -pub use channel::handler as channel; +pub use conversation::handler as conversation; pub use index::handler as index; pub use invite::handler as invite; pub use login::handler as login; |
