diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 20:44:07 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 21:03:02 -0400 |
| commit | 617172576b95bbb935a75f98a98787da5a4e9a9d (patch) | |
| tree | ae72fea2e81d023960c93d4efbf7e137c3705c48 /src | |
| parent | 0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (diff) | |
List messages per channel.
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 44 | ||||
| -rw-r--r-- | src/channel/repo.rs | 7 | ||||
| -rw-r--r-- | src/channel/routes.rs | 76 | ||||
| -rw-r--r-- | src/event/app.rs | 8 | ||||
| -rw-r--r-- | src/event/mod.rs | 2 | ||||
| -rw-r--r-- | src/event/sequence.rs | 7 | ||||
| -rw-r--r-- | src/message/app.rs | 4 | ||||
| -rw-r--r-- | src/message/event.rs | 27 | ||||
| -rw-r--r-- | src/message/history.rs | 4 | ||||
| -rw-r--r-- | src/message/repo.rs | 57 | ||||
| -rw-r--r-- | src/message/snapshot.rs | 4 |
11 files changed, 189 insertions, 51 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 24be2ff..b3bfbee 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, - message::repo::Provider as _, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -54,22 +54,52 @@ impl<'a> Channels<'a> { Ok(channels) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn messages( + &self, + channel: &Id, + resume_point: Option<Sequence>, + ) -> Result<Vec<Message>, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| DeleteError::NotFound(channel.clone()))? + .not_found(|| Error::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel).await?; + let messages = tx.messages().in_channel(&channel, None).await?; for message in messages { + let message = message.snapshot(); let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; events.extend( message .events() @@ -117,7 +147,7 @@ impl<'a> Channels<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.sequence() < b.sequence()) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::<Vec<_>>(), ); @@ -135,7 +165,7 @@ pub enum CreateError { } #[derive(Debug, thiserror::Error)] -pub enum DeleteError { +pub enum Error { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 8bb761b..2b48436 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -84,10 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all( - &mut self, - resume_point: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -99,7 +96,7 @@ impl<'c> Channels<'c> { where coalesce(created_sequence <= $1, true) order by channel.name "#, - resume_point, + resume_at, ) .map(|row| History { channel: Channel { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index bce634e..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,13 +7,14 @@ use axum::{ }; use axum_extra::extract::Query; -use super::{ - app::{self, DeleteError}, - Channel, Id, -}; +use super::{app, Channel, Id}; use crate::{ - app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, - message::app::SendError, + app::App, + clock::RequestedAt, + error::Internal, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -25,17 +26,18 @@ pub fn router() -> Router<App> { .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } #[derive(Default, serde::Deserialize)] -struct ListQuery { +struct ResumeQuery { resume_point: Option<Sequence>, } async fn list( State(app): State<App>, _: Login, - Query(query): Query<ListQuery>, + Query(query): Query<ResumeQuery>, ) -> Result<Channels, Internal> { let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); @@ -127,7 +129,7 @@ async fn on_delete( Path(channel): Path<Id>, RequestedAt(deleted_at): RequestedAt, _: Login, -) -> Result<StatusCode, DeleteErrorResponse> { +) -> Result<StatusCode, ErrorResponse> { app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) @@ -135,16 +137,66 @@ async fn on_delete( #[derive(Debug, thiserror::Error)] #[error(transparent)] -struct DeleteErrorResponse(#[from] DeleteError); +struct ErrorResponse(#[from] app::Error); -impl IntoResponse for DeleteErrorResponse { +impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ DeleteError::NotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State<App>, + Path(channel): Path<Id>, + _: Login, + Query(query): Query<ResumeQuery>, +) -> Result<Messages, ErrorResponse> { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec<MessageView>); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} diff --git a/src/event/app.rs b/src/event/app.rs index 32f0a97..d664ec7 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -36,7 +36,7 @@ impl<'a> Events<'a> { let channel_events = channels .iter() .map(channel::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); @@ -44,14 +44,12 @@ impl<'a> Events<'a> { let message_events = messages .iter() .map(message::History::events) - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .filter(Sequence::after(resume_at)) .map(Event::from); let replay_events = channel_events - .merge_by(message_events, |a, b| { - a.instant.sequence < b.instant.sequence - }) + .merge_by(message_events, Sequence::merge) .collect::<Vec<_>>(); let resume_live_at = replay_events.last().map(Sequenced::sequence); diff --git a/src/event/mod.rs b/src/event/mod.rs index 1503b77..1349fe6 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -38,7 +38,7 @@ impl From<channel::Event> for Event { impl From<message::Event> for Event { fn from(event: message::Event) -> Self { Self { - instant: event.instant, + instant: event.instant(), kind: event.kind.into(), } } diff --git a/src/event/sequence.rs b/src/event/sequence.rs index c566156..fbe3711 100644 --- a/src/event/sequence.rs +++ b/src/event/sequence.rs @@ -59,6 +59,13 @@ impl Sequence { { move |event| resume_point <= event.sequence() } + + pub fn merge<E>(a: &E, b: &E) -> bool + where + E: Sequenced, + { + a.sequence() < b.sequence() + } } pub trait Sequenced { diff --git a/src/message/app.rs b/src/message/app.rs index 1d34c14..fd6a334 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -7,7 +7,7 @@ use crate::{ channel::{self, repo::Provider as _}, clock::DateTime, db::NotFound as _, - event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence}, + event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, login::Login, }; @@ -87,7 +87,7 @@ impl<'a> Messages<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::<Vec<_>>(), ); diff --git a/src/message/event.rs b/src/message/event.rs index bcc2238..66db9b0 100644 --- a/src/message/event.rs +++ b/src/message/event.rs @@ -7,14 +7,12 @@ use crate::{ #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Event { #[serde(flatten)] - pub instant: Instant, - #[serde(flatten)] pub kind: Kind, } impl Sequenced for Event { fn instant(&self) -> Instant { - self.instant + self.kind.instant() } } @@ -25,12 +23,27 @@ pub enum Kind { Deleted(Deleted), } +impl Sequenced for Kind { + fn instant(&self) -> Instant { + match self { + Self::Sent(sent) => sent.instant(), + Self::Deleted(deleted) => deleted.instant(), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Sent { #[serde(flatten)] pub message: Message, } +impl Sequenced for Sent { + fn instant(&self) -> Instant { + self.message.sent + } +} + impl From<Sent> for Kind { fn from(event: Sent) -> Self { Self::Sent(event) @@ -39,10 +52,18 @@ impl From<Sent> for Kind { #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Deleted { + #[serde(flatten)] + pub instant: Instant, pub channel: Channel, pub message: Id, } +impl Sequenced for Deleted { + fn instant(&self) -> Instant { + self.instant + } +} + impl From<Deleted> for Kind { fn from(event: Deleted) -> Self { Self::Deleted(event) diff --git a/src/message/history.rs b/src/message/history.rs index 5aca47e..89fc6b1 100644 --- a/src/message/history.rs +++ b/src/message/history.rs @@ -7,14 +7,12 @@ use crate::event::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct History { pub message: Message, - pub sent: Instant, pub deleted: Option<Instant>, } impl History { fn sent(&self) -> Event { Event { - instant: self.sent, kind: Sent { message: self.message.clone(), } @@ -24,8 +22,8 @@ impl History { fn deleted(&self) -> Option<Event> { self.deleted.map(|instant| Event { - instant, kind: Deleted { + instant, channel: self.message.channel.clone(), message: self.message.id.clone(), } diff --git a/src/message/repo.rs b/src/message/repo.rs index ae41736..fc835c8 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -48,12 +48,12 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: *sent, channel: channel.clone(), sender: sender.clone(), id: row.id, body: row.body, }, - sent: *sent, deleted: None, }) .fetch_one(&mut *self.0) @@ -62,18 +62,51 @@ impl<'c> Messages<'c> { Ok(message) } - pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> { - let messages = sqlx::query_scalar!( + pub async fn in_channel( + &mut self, + channel: &Channel, + resume_at: Option<Sequence>, + ) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( r#" select - message.id as "id: Id" + channel.id as "channel_id: channel::Id", + channel.name as "channel_name", + sender.id as "sender_id: login::Id", + sender.name as "sender_name", + message.id as "id: Id", + message.body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" from message join channel on message.channel = channel.id + join login as sender on message.sender = sender.id where channel.id = $1 + and coalesce(message.sent_sequence <= $2, true) order by message.sent_sequence "#, channel.id, + resume_at, ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: Channel { + id: row.channel_id, + name: row.channel_name, + }, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + id: row.id, + body: row.body, + }, + deleted: None, + }) .fetch_all(&mut *self.0) .await?; @@ -101,6 +134,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -112,10 +149,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_one(&mut *self.0) @@ -189,6 +222,10 @@ impl<'c> Messages<'c> { ) .map(|row| History { message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, channel: Channel { id: row.channel_id, name: row.channel_name, @@ -200,10 +237,6 @@ impl<'c> Messages<'c> { id: row.id, body: row.body, }, - sent: Instant { - at: row.sent_at, - sequence: row.sent_sequence, - }, deleted: None, }) .fetch_all(&mut *self.0) diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs index 3adccbe..522c1aa 100644 --- a/src/message/snapshot.rs +++ b/src/message/snapshot.rs @@ -2,11 +2,13 @@ use super::{ event::{Event, Kind, Sent}, Id, }; -use crate::{channel::Channel, login::Login}; +use crate::{channel::Channel, event::Instant, login::Login}; #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] #[serde(into = "self::serialize::Message")] pub struct Message { + #[serde(skip)] + pub sent: Instant, pub channel: Channel, pub sender: Login, pub id: Id, |
