summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-03 20:44:07 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 21:03:02 -0400
commit617172576b95bbb935a75f98a98787da5a4e9a9d (patch)
treeae72fea2e81d023960c93d4efbf7e137c3705c48 /src/channel
parent0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (diff)
List messages per channel.
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs44
-rw-r--r--src/channel/repo.rs7
-rw-r--r--src/channel/routes.rs76
3 files changed, 103 insertions, 24 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 24be2ff..b3bfbee 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -7,7 +7,7 @@ use crate::{
clock::DateTime,
db::NotFound,
event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
- message::repo::Provider as _,
+ message::{repo::Provider as _, Message},
};
pub struct Channels<'a> {
@@ -54,22 +54,52 @@ impl<'a> Channels<'a> {
Ok(channels)
}
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ pub async fn messages(
+ &self,
+ channel: &Id,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Message>, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point)
+ .await?
+ .into_iter()
+ .filter_map(|message| {
+ message
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
+ Ok(messages)
+ }
+
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
- .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .not_found(|| Error::NotFound(channel.clone()))?
.snapshot();
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel).await?;
+ let messages = tx.messages().in_channel(&channel, None).await?;
for message in messages {
+ let message = message.snapshot();
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
+ let message = tx.messages().delete(&message.id, &deleted).await?;
events.extend(
message
.events()
@@ -117,7 +147,7 @@ impl<'a> Channels<'a> {
self.events.broadcast(
events
.into_iter()
- .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .kmerge_by(Sequence::merge)
.map(Event::from)
.collect::<Vec<_>>(),
);
@@ -135,7 +165,7 @@ pub enum CreateError {
}
#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
+pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
#[error(transparent)]
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 8bb761b..2b48436 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -84,10 +84,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn all(
- &mut self,
- resume_point: Option<Sequence>,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -99,7 +96,7 @@ impl<'c> Channels<'c> {
where coalesce(created_sequence <= $1, true)
order by channel.name
"#,
- resume_point,
+ resume_at,
)
.map(|row| History {
channel: Channel {
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index bce634e..23c0602 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -7,13 +7,14 @@ use axum::{
};
use axum_extra::extract::Query;
-use super::{
- app::{self, DeleteError},
- Channel, Id,
-};
+use super::{app, Channel, Id};
use crate::{
- app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login,
- message::app::SendError,
+ app::App,
+ clock::RequestedAt,
+ error::Internal,
+ event::{Instant, Sequence},
+ login::Login,
+ message::{self, app::SendError},
};
#[cfg(test)]
@@ -25,17 +26,18 @@ pub fn router() -> Router<App> {
.route("/api/channels", post(on_create))
.route("/api/channels/:channel", post(on_send))
.route("/api/channels/:channel", delete(on_delete))
+ .route("/api/channels/:channel/messages", get(messages))
}
#[derive(Default, serde::Deserialize)]
-struct ListQuery {
+struct ResumeQuery {
resume_point: Option<Sequence>,
}
async fn list(
State(app): State<App>,
_: Login,
- Query(query): Query<ListQuery>,
+ Query(query): Query<ResumeQuery>,
) -> Result<Channels, Internal> {
let channels = app.channels().all(query.resume_point).await?;
let response = Channels(channels);
@@ -127,7 +129,7 @@ async fn on_delete(
Path(channel): Path<Id>,
RequestedAt(deleted_at): RequestedAt,
_: Login,
-) -> Result<StatusCode, DeleteErrorResponse> {
+) -> Result<StatusCode, ErrorResponse> {
app.channels().delete(&channel, &deleted_at).await?;
Ok(StatusCode::ACCEPTED)
@@ -135,16 +137,66 @@ async fn on_delete(
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
-struct DeleteErrorResponse(#[from] DeleteError);
+struct ErrorResponse(#[from] app::Error);
-impl IntoResponse for DeleteErrorResponse {
+impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ DeleteError::NotFound(_) => {
+ not_found @ app::Error::NotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
}
}
}
+
+async fn messages(
+ State(app): State<App>,
+ Path(channel): Path<Id>,
+ _: Login,
+ Query(query): Query<ResumeQuery>,
+) -> Result<Messages, ErrorResponse> {
+ let messages = app
+ .channels()
+ .messages(&channel, query.resume_point)
+ .await?;
+ let response = Messages(
+ messages
+ .into_iter()
+ .map(|message| MessageView {
+ sent: message.sent,
+ sender: message.sender,
+ message: MessageInner {
+ id: message.id,
+ body: message.body,
+ },
+ })
+ .collect(),
+ );
+
+ Ok(response)
+}
+
+struct Messages(Vec<MessageView>);
+
+#[derive(serde::Serialize)]
+struct MessageView {
+ #[serde(flatten)]
+ sent: Instant,
+ sender: Login,
+ message: MessageInner,
+}
+
+#[derive(serde::Serialize)]
+struct MessageInner {
+ id: message::Id,
+ body: String,
+}
+
+impl IntoResponse for Messages {
+ fn into_response(self) -> Response {
+ let Self(messages) = self;
+ Json(messages).into_response()
+ }
+}