diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-10-03 23:30:42 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-10-03 23:30:42 -0400 |
| commit | d50b1b56c011c03c7d8a95242af404b727e91a80 (patch) | |
| tree | efe3408f6a8ef669981826d1a29d16a24b460d89 /src/channel | |
| parent | 30c13478d61065a512f5bc8824fecbf2ee6afc81 (diff) | |
| parent | 7f12fd41c2941a55a6437f24e4f780104a718790 (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 129 | ||||
| -rw-r--r-- | src/channel/event.rs | 48 | ||||
| -rw-r--r-- | src/channel/history.rs | 42 | ||||
| -rw-r--r-- | src/channel/id.rs | 38 | ||||
| -rw-r--r-- | src/channel/mod.rs | 7 | ||||
| -rw-r--r-- | src/channel/repo.rs | 202 | ||||
| -rw-r--r-- | src/channel/routes.rs | 122 | ||||
| -rw-r--r-- | src/channel/routes/test/list.rs | 7 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 13 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 23 | ||||
| -rw-r--r-- | src/channel/snapshot.rs | 38 |
11 files changed, 613 insertions, 56 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 70cda47..bb331ec 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,10 +1,13 @@ use chrono::TimeDelta; +use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use super::{repo::Provider as _, Channel, Id}; use crate::{ clock::DateTime, - events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, - repo::channel::{Channel, Provider as _}, + db::NotFound, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -19,27 +22,108 @@ impl<'a> Channels<'a> { pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; + let created = tx.sequence().next(created_at).await?; let channel = tx .channels() - .create(name, created_at) + .create(name, &created) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; tx.commit().await?; self.events - .broadcast(&ChannelEvent::created(channel.clone())); + .broadcast(channel.events().map(Event::from).collect::<Vec<_>>()); - Ok(channel) + Ok(channel.snapshot()) } - pub async fn all(&self) -> Result<Vec<Channel>, InternalError> { + pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> { let mut tx = self.db.begin().await?; - let channels = tx.channels().all().await?; + let channels = tx.channels().all(resume_point).await?; tx.commit().await?; + let channels = channels + .into_iter() + .filter_map(|channel| { + channel + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + Ok(channels) } + pub async fn messages( + &self, + channel: &Id, + resume_point: Option<Sequence>, + ) -> Result<Vec<Message>, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { + let mut tx = self.db.begin().await?; + + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let mut events = Vec::new(); + + let messages = tx.messages().in_channel(&channel, None).await?; + for message in messages { + let message = message.snapshot(); + let deleted = tx.sequence().next(deleted_at).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; + events.extend( + message + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + } + + let deleted = tx.sequence().next(deleted_at).await?; + let channel = tx.channels().delete(&channel.id, &deleted).await?; + events.extend( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)) + .map(Event::from), + ); + + tx.commit().await?; + + self.events.broadcast(events); + + Ok(()) + } + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { // Somewhat arbitrarily, expire after 90 days. let expire_at = relative_to.to_owned() - TimeDelta::days(90); @@ -49,19 +133,24 @@ impl<'a> Channels<'a> { let mut events = Vec::with_capacity(expired.len()); for channel in expired { - let sequence = tx.message_events().assign_sequence(&channel).await?; - let event = tx - .channels() - .delete_expired(&channel, sequence, relative_to) - .await?; - events.push(event); + let deleted = tx.sequence().next(relative_to).await?; + let channel = tx.channels().delete(&channel, &deleted).await?; + events.push( + channel + .events() + .filter(Sequence::start_from(deleted.sequence)), + ); } tx.commit().await?; - for event in events { - self.events.broadcast(&event); - } + self.events.broadcast( + events + .into_iter() + .kmerge_by(Sequence::merge) + .map(Event::from) + .collect::<Vec<_>>(), + ); Ok(()) } @@ -75,6 +164,14 @@ pub enum CreateError { DatabaseError(#[from] sqlx::Error), } +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("channel {0} not found")] + NotFound(Id), + #[error(transparent)] + DatabaseError(#[from] sqlx::Error), +} + impl CreateError { fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self { if let Some(error) = error.as_database_error() { diff --git a/src/channel/event.rs b/src/channel/event.rs new file mode 100644 index 0000000..9c54174 --- /dev/null +++ b/src/channel/event.rs @@ -0,0 +1,48 @@ +use super::Channel; +use crate::{ + channel, + event::{Instant, Sequenced}, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Event { + #[serde(flatten)] + pub instant: Instant, + #[serde(flatten)] + pub kind: Kind, +} + +impl Sequenced for Event { + fn instant(&self) -> Instant { + self.instant + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Kind { + Created(Created), + Deleted(Deleted), +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Created { + pub channel: Channel, +} + +impl From<Created> for Kind { + fn from(event: Created) -> Self { + Self::Created(event) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Deleted { + pub channel: channel::Id, +} + +impl From<Deleted> for Kind { + fn from(event: Deleted) -> Self { + Self::Deleted(event) + } +} diff --git a/src/channel/history.rs b/src/channel/history.rs new file mode 100644 index 0000000..3cc7d9d --- /dev/null +++ b/src/channel/history.rs @@ -0,0 +1,42 @@ +use super::{ + event::{Created, Deleted, Event}, + Channel, +}; +use crate::event::Instant; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct History { + pub channel: Channel, + pub created: Instant, + pub deleted: Option<Instant>, +} + +impl History { + fn created(&self) -> Event { + Event { + instant: self.created, + kind: Created { + channel: self.channel.clone(), + } + .into(), + } + } + + fn deleted(&self) -> Option<Event> { + self.deleted.map(|instant| Event { + instant, + kind: Deleted { + channel: self.channel.id.clone(), + } + .into(), + }) + } + + pub fn events(&self) -> impl Iterator<Item = Event> { + [self.created()].into_iter().chain(self.deleted()) + } + + pub fn snapshot(&self) -> Channel { + self.channel.clone() + } +} diff --git a/src/channel/id.rs b/src/channel/id.rs new file mode 100644 index 0000000..22a2700 --- /dev/null +++ b/src/channel/id.rs @@ -0,0 +1,38 @@ +use std::fmt; + +use crate::id::Id as BaseId; + +// Stable identifier for a [Channel]. Prefixed with `C`. +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] +#[sqlx(transparent)] +#[serde(transparent)] +pub struct Id(BaseId); + +impl From<BaseId> for Id { + fn from(id: BaseId) -> Self { + Self(id) + } +} + +impl Id { + pub fn generate() -> Self { + BaseId::generate("C") + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/src/channel/mod.rs b/src/channel/mod.rs index 9f79dbb..eb8200b 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -1,4 +1,9 @@ pub mod app; +pub mod event; +mod history; +mod id; +pub mod repo; mod routes; +mod snapshot; -pub use self::routes::router; +pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel}; diff --git a/src/channel/repo.rs b/src/channel/repo.rs new file mode 100644 index 0000000..2b48436 --- /dev/null +++ b/src/channel/repo.rs @@ -0,0 +1,202 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + channel::{Channel, History, Id}, + clock::DateTime, + event::{Instant, Sequence}, +}; + +pub trait Provider { + fn channels(&mut self) -> Channels; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn channels(&mut self) -> Channels { + Channels(self) + } +} + +pub struct Channels<'t>(&'t mut SqliteConnection); + +impl<'c> Channels<'c> { + pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> { + let id = Id::generate(); + let channel = sqlx::query!( + r#" + insert + into channel (id, name, created_at, created_sequence) + values ($1, $2, $3, $4) + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + "#, + id, + name, + created.at, + created.sequence, + ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> { + let channel = sqlx::query!( + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where id = $1 + "#, + channel, + ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: None, + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence <= $1, true) + order by channel.name + "#, + resume_at, + ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn replay( + &mut self, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let channels = sqlx::query!( + r#" + select + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + from channel + where coalesce(created_sequence > $1, true) + "#, + resume_at, + ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } + + pub async fn delete( + &mut self, + channel: &Id, + deleted: &Instant, + ) -> Result<History, sqlx::Error> { + let channel = sqlx::query!( + r#" + delete from channel + where id = $1 + returning + id as "id: Id", + name, + created_at as "created_at: DateTime", + created_sequence as "created_sequence: Sequence" + "#, + channel, + ) + .map(|row| History { + channel: Channel { + id: row.id, + name: row.name, + }, + created: Instant { + at: row.created_at, + sequence: row.created_sequence, + }, + deleted: Some(*deleted), + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(channel) + } + + pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> { + let channels = sqlx::query_scalar!( + r#" + select + channel.id as "id: Id" + from channel + left join message + where created_at < $1 + and message.id is null + "#, + expired_at, + ) + .fetch_all(&mut *self.0) + .await?; + + Ok(channels) + } +} diff --git a/src/channel/routes.rs b/src/channel/routes.rs index 1f8db5a..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -2,20 +2,19 @@ use axum::{ extract::{Json, Path, State}, http::StatusCode, response::{IntoResponse, Response}, - routing::{get, post}, + routing::{delete, get, post}, Router, }; +use axum_extra::extract::Query; -use super::app; +use super::{app, Channel, Id}; use crate::{ app::App, clock::RequestedAt, error::Internal, - events::app::EventsError, - repo::{ - channel::{self, Channel}, - login::Login, - }, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -26,10 +25,21 @@ pub fn router() -> Router<App> { .route("/api/channels", get(list)) .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) + .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } -async fn list(State(app): State<App>, _: Login) -> Result<Channels, Internal> { - let channels = app.channels().all().await?; +#[derive(Default, serde::Deserialize)] +struct ResumeQuery { + resume_point: Option<Sequence>, +} + +async fn list( + State(app): State<App>, + _: Login, + Query(query): Query<ResumeQuery>, +) -> Result<Channels, Internal> { + let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); Ok(response) @@ -86,31 +96,107 @@ struct SendRequest { async fn on_send( State(app): State<App>, - Path(channel): Path<channel::Id>, + Path(channel): Path<Id>, RequestedAt(sent_at): RequestedAt, login: Login, Json(request): Json<SendRequest>, +) -> Result<StatusCode, SendErrorResponse> { + app.messages() + .send(&channel, &login, &sent_at, &request.message) + .await?; + + Ok(StatusCode::ACCEPTED) +} + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct SendErrorResponse(#[from] SendError); + +impl IntoResponse for SendErrorResponse { + fn into_response(self) -> Response { + let Self(error) = self; + match error { + not_found @ SendError::ChannelNotFound(_) => { + (StatusCode::NOT_FOUND, not_found.to_string()).into_response() + } + other => Internal::from(other).into_response(), + } + } +} + +async fn on_delete( + State(app): State<App>, + Path(channel): Path<Id>, + RequestedAt(deleted_at): RequestedAt, + _: Login, ) -> Result<StatusCode, ErrorResponse> { - app.events() - .send(&login, &channel, &request.message, &sent_at) - .await - // Could impl `From` here, but it's more code and this is used once. - .map_err(ErrorResponse)?; + app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) } -#[derive(Debug)] -struct ErrorResponse(EventsError); +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct ErrorResponse(#[from] app::Error); impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ EventsError::ChannelNotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State<App>, + Path(channel): Path<Id>, + _: Login, + Query(query): Query<ResumeQuery>, +) -> Result<Messages, ErrorResponse> { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec<MessageView>); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs index bc94024..f15a53c 100644 --- a/src/channel/routes/test/list.rs +++ b/src/channel/routes/test/list.rs @@ -1,4 +1,5 @@ use axum::extract::State; +use axum_extra::extract::Query; use crate::{channel::routes, test::fixtures}; @@ -11,7 +12,7 @@ async fn empty_list() { // Call the endpoint - let routes::Channels(channels) = routes::list(State(app), viewer) + let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) .await .expect("always succeeds"); @@ -30,7 +31,7 @@ async fn one_channel() { // Call the endpoint - let routes::Channels(channels) = routes::list(State(app), viewer) + let routes::Channels(channels) = routes::list(State(app), viewer, Query::default()) .await .expect("always succeeds"); @@ -52,7 +53,7 @@ async fn multiple_channels() { // Call the endpoint - let routes::Channels(response_channels) = routes::list(State(app), viewer) + let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default()) .await .expect("always succeeds"); diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index e2610a5..5733c9e 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -3,7 +3,7 @@ use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, - events::types, + event, test::fixtures::{self, future::Immediately as _}, }; @@ -33,26 +33,25 @@ async fn new_channel() { // Verify the semantics - let channels = app.channels().all().await.expect("always succeeds"); + let channels = app.channels().all(None).await.expect("always succeeds"); assert!(channels.contains(&response_channel)); let mut events = app .events() - .subscribe(types::ResumePoint::default()) + .subscribe(None) .await .expect("subscribing never fails") .filter(fixtures::filter::created()); - let types::ResumableEvent(_, event) = events + let event = events .next() .immediately() .await .expect("creation event published"); - assert_eq!(types::Sequence::default(), event.sequence); assert!(matches!( - event.data, - types::ChannelEventData::Created(event) + event.kind, + event::Kind::ChannelCreated(event) if event.channel == response_channel )); } diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 233518b..3297093 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -2,9 +2,10 @@ use axum::extract::{Json, Path, State}; use futures::stream::StreamExt; use crate::{ + channel, channel::routes, - events::{app, types}, - repo::channel, + event, + message::app::SendError, test::fixtures::{self, future::Immediately as _}, }; @@ -43,7 +44,7 @@ async fn messages_in_order() { let events = app .events() - .subscribe(types::ResumePoint::default()) + .subscribe(None) .await .expect("subscribing to a valid channel") .filter(fixtures::filter::messages()) @@ -51,13 +52,13 @@ async fn messages_in_order() { let events = events.collect::<Vec<_>>().immediately().await; - for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) { - assert_eq!(*sent_at, event.at); + for ((sent_at, message), event) in requests.into_iter().zip(events) { + assert_eq!(*sent_at, event.instant.at); assert!(matches!( - event.data, - types::ChannelEventData::Message(event_message) - if event_message.sender == sender - && event_message.message.body == message + event.kind, + event::Kind::MessageSent(event) + if event.message.sender == sender + && event.message.body == message )); } } @@ -76,7 +77,7 @@ async fn nonexistent_channel() { let request = routes::SendRequest { message: fixtures::message::propose(), }; - let routes::ErrorResponse(error) = routes::on_send( + let routes::SendErrorResponse(error) = routes::on_send( State(app), Path(channel.clone()), sent_at, @@ -90,6 +91,6 @@ async fn nonexistent_channel() { assert!(matches!( error, - app::EventsError::ChannelNotFound(error_channel) if channel == error_channel + SendError::ChannelNotFound(error_channel) if channel == error_channel )); } diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs new file mode 100644 index 0000000..6462f25 --- /dev/null +++ b/src/channel/snapshot.rs @@ -0,0 +1,38 @@ +use super::{ + event::{Created, Event, Kind}, + Id, +}; + +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] +pub struct Channel { + pub id: Id, + pub name: String, +} + +impl Channel { + fn apply(state: Option<Self>, event: Event) -> Option<Self> { + match (state, event.kind) { + (None, Kind::Created(event)) => Some(event.into()), + (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None, + (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"), + } + } +} + +impl FromIterator<Event> for Option<Channel> { + fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self { + events.into_iter().fold(None, Channel::apply) + } +} + +impl From<&Created> for Channel { + fn from(event: &Created) -> Self { + event.channel.clone() + } +} + +impl From<Created> for Channel { + fn from(event: Created) -> Self { + event.channel + } +} |
