diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 20:44:07 -0400 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2024-10-03 21:03:02 -0400 |
| commit | 617172576b95bbb935a75f98a98787da5a4e9a9d (patch) | |
| tree | ae72fea2e81d023960c93d4efbf7e137c3705c48 /src/channel | |
| parent | 0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (diff) | |
List messages per channel.
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 44 | ||||
| -rw-r--r-- | src/channel/repo.rs | 7 | ||||
| -rw-r--r-- | src/channel/routes.rs | 76 |
3 files changed, 103 insertions, 24 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 24be2ff..b3bfbee 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -7,7 +7,7 @@ use crate::{ clock::DateTime, db::NotFound, event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced}, - message::repo::Provider as _, + message::{repo::Provider as _, Message}, }; pub struct Channels<'a> { @@ -54,22 +54,52 @@ impl<'a> Channels<'a> { Ok(channels) } - pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> { + pub async fn messages( + &self, + channel: &Id, + resume_point: Option<Sequence>, + ) -> Result<Vec<Message>, Error> { + let mut tx = self.db.begin().await?; + let channel = tx + .channels() + .by_id(channel) + .await + .not_found(|| Error::NotFound(channel.clone()))? + .snapshot(); + + let messages = tx + .messages() + .in_channel(&channel, resume_point) + .await? + .into_iter() + .filter_map(|message| { + message + .events() + .filter(Sequence::up_to(resume_point)) + .collect() + }) + .collect(); + + Ok(messages) + } + + pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> { let mut tx = self.db.begin().await?; let channel = tx .channels() .by_id(channel) .await - .not_found(|| DeleteError::NotFound(channel.clone()))? + .not_found(|| Error::NotFound(channel.clone()))? .snapshot(); let mut events = Vec::new(); - let messages = tx.messages().in_channel(&channel).await?; + let messages = tx.messages().in_channel(&channel, None).await?; for message in messages { + let message = message.snapshot(); let deleted = tx.sequence().next(deleted_at).await?; - let message = tx.messages().delete(&message, &deleted).await?; + let message = tx.messages().delete(&message.id, &deleted).await?; events.extend( message .events() @@ -117,7 +147,7 @@ impl<'a> Channels<'a> { self.events.broadcast( events .into_iter() - .kmerge_by(|a, b| a.sequence() < b.sequence()) + .kmerge_by(Sequence::merge) .map(Event::from) .collect::<Vec<_>>(), ); @@ -135,7 +165,7 @@ pub enum CreateError { } #[derive(Debug, thiserror::Error)] -pub enum DeleteError { +pub enum Error { #[error("channel {0} not found")] NotFound(Id), #[error(transparent)] diff --git a/src/channel/repo.rs b/src/channel/repo.rs index 8bb761b..2b48436 100644 --- a/src/channel/repo.rs +++ b/src/channel/repo.rs @@ -84,10 +84,7 @@ impl<'c> Channels<'c> { Ok(channel) } - pub async fn all( - &mut self, - resume_point: Option<Sequence>, - ) -> Result<Vec<History>, sqlx::Error> { + pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> { let channels = sqlx::query!( r#" select @@ -99,7 +96,7 @@ impl<'c> Channels<'c> { where coalesce(created_sequence <= $1, true) order by channel.name "#, - resume_point, + resume_at, ) .map(|row| History { channel: Channel { diff --git a/src/channel/routes.rs b/src/channel/routes.rs index bce634e..23c0602 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -7,13 +7,14 @@ use axum::{ }; use axum_extra::extract::Query; -use super::{ - app::{self, DeleteError}, - Channel, Id, -}; +use super::{app, Channel, Id}; use crate::{ - app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login, - message::app::SendError, + app::App, + clock::RequestedAt, + error::Internal, + event::{Instant, Sequence}, + login::Login, + message::{self, app::SendError}, }; #[cfg(test)] @@ -25,17 +26,18 @@ pub fn router() -> Router<App> { .route("/api/channels", post(on_create)) .route("/api/channels/:channel", post(on_send)) .route("/api/channels/:channel", delete(on_delete)) + .route("/api/channels/:channel/messages", get(messages)) } #[derive(Default, serde::Deserialize)] -struct ListQuery { +struct ResumeQuery { resume_point: Option<Sequence>, } async fn list( State(app): State<App>, _: Login, - Query(query): Query<ListQuery>, + Query(query): Query<ResumeQuery>, ) -> Result<Channels, Internal> { let channels = app.channels().all(query.resume_point).await?; let response = Channels(channels); @@ -127,7 +129,7 @@ async fn on_delete( Path(channel): Path<Id>, RequestedAt(deleted_at): RequestedAt, _: Login, -) -> Result<StatusCode, DeleteErrorResponse> { +) -> Result<StatusCode, ErrorResponse> { app.channels().delete(&channel, &deleted_at).await?; Ok(StatusCode::ACCEPTED) @@ -135,16 +137,66 @@ async fn on_delete( #[derive(Debug, thiserror::Error)] #[error(transparent)] -struct DeleteErrorResponse(#[from] DeleteError); +struct ErrorResponse(#[from] app::Error); -impl IntoResponse for DeleteErrorResponse { +impl IntoResponse for ErrorResponse { fn into_response(self) -> Response { let Self(error) = self; match error { - not_found @ DeleteError::NotFound(_) => { + not_found @ app::Error::NotFound(_) => { (StatusCode::NOT_FOUND, not_found.to_string()).into_response() } other => Internal::from(other).into_response(), } } } + +async fn messages( + State(app): State<App>, + Path(channel): Path<Id>, + _: Login, + Query(query): Query<ResumeQuery>, +) -> Result<Messages, ErrorResponse> { + let messages = app + .channels() + .messages(&channel, query.resume_point) + .await?; + let response = Messages( + messages + .into_iter() + .map(|message| MessageView { + sent: message.sent, + sender: message.sender, + message: MessageInner { + id: message.id, + body: message.body, + }, + }) + .collect(), + ); + + Ok(response) +} + +struct Messages(Vec<MessageView>); + +#[derive(serde::Serialize)] +struct MessageView { + #[serde(flatten)] + sent: Instant, + sender: Login, + message: MessageInner, +} + +#[derive(serde::Serialize)] +struct MessageInner { + id: message::Id, + body: String, +} + +impl IntoResponse for Messages { + fn into_response(self) -> Response { + let Self(messages) = self; + Json(messages).into_response() + } +} |
