summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-03 20:44:07 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 21:03:02 -0400
commit617172576b95bbb935a75f98a98787da5a4e9a9d (patch)
treeae72fea2e81d023960c93d4efbf7e137c3705c48 /src
parent0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (diff)
List messages per channel.
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs44
-rw-r--r--src/channel/repo.rs7
-rw-r--r--src/channel/routes.rs76
-rw-r--r--src/event/app.rs8
-rw-r--r--src/event/mod.rs2
-rw-r--r--src/event/sequence.rs7
-rw-r--r--src/message/app.rs4
-rw-r--r--src/message/event.rs27
-rw-r--r--src/message/history.rs4
-rw-r--r--src/message/repo.rs57
-rw-r--r--src/message/snapshot.rs4
11 files changed, 189 insertions, 51 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 24be2ff..b3bfbee 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -7,7 +7,7 @@ use crate::{
clock::DateTime,
db::NotFound,
event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
- message::repo::Provider as _,
+ message::{repo::Provider as _, Message},
};
pub struct Channels<'a> {
@@ -54,22 +54,52 @@ impl<'a> Channels<'a> {
Ok(channels)
}
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ pub async fn messages(
+ &self,
+ channel: &Id,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Message>, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point)
+ .await?
+ .into_iter()
+ .filter_map(|message| {
+ message
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
+ Ok(messages)
+ }
+
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
- .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .not_found(|| Error::NotFound(channel.clone()))?
.snapshot();
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel).await?;
+ let messages = tx.messages().in_channel(&channel, None).await?;
for message in messages {
+ let message = message.snapshot();
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
+ let message = tx.messages().delete(&message.id, &deleted).await?;
events.extend(
message
.events()
@@ -117,7 +147,7 @@ impl<'a> Channels<'a> {
self.events.broadcast(
events
.into_iter()
- .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .kmerge_by(Sequence::merge)
.map(Event::from)
.collect::<Vec<_>>(),
);
@@ -135,7 +165,7 @@ pub enum CreateError {
}
#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
+pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
#[error(transparent)]
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
index 8bb761b..2b48436 100644
--- a/src/channel/repo.rs
+++ b/src/channel/repo.rs
@@ -84,10 +84,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn all(
- &mut self,
- resume_point: Option<Sequence>,
- ) -> Result<Vec<History>, sqlx::Error> {
+ pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> {
let channels = sqlx::query!(
r#"
select
@@ -99,7 +96,7 @@ impl<'c> Channels<'c> {
where coalesce(created_sequence <= $1, true)
order by channel.name
"#,
- resume_point,
+ resume_at,
)
.map(|row| History {
channel: Channel {
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index bce634e..23c0602 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -7,13 +7,14 @@ use axum::{
};
use axum_extra::extract::Query;
-use super::{
- app::{self, DeleteError},
- Channel, Id,
-};
+use super::{app, Channel, Id};
use crate::{
- app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login,
- message::app::SendError,
+ app::App,
+ clock::RequestedAt,
+ error::Internal,
+ event::{Instant, Sequence},
+ login::Login,
+ message::{self, app::SendError},
};
#[cfg(test)]
@@ -25,17 +26,18 @@ pub fn router() -> Router<App> {
.route("/api/channels", post(on_create))
.route("/api/channels/:channel", post(on_send))
.route("/api/channels/:channel", delete(on_delete))
+ .route("/api/channels/:channel/messages", get(messages))
}
#[derive(Default, serde::Deserialize)]
-struct ListQuery {
+struct ResumeQuery {
resume_point: Option<Sequence>,
}
async fn list(
State(app): State<App>,
_: Login,
- Query(query): Query<ListQuery>,
+ Query(query): Query<ResumeQuery>,
) -> Result<Channels, Internal> {
let channels = app.channels().all(query.resume_point).await?;
let response = Channels(channels);
@@ -127,7 +129,7 @@ async fn on_delete(
Path(channel): Path<Id>,
RequestedAt(deleted_at): RequestedAt,
_: Login,
-) -> Result<StatusCode, DeleteErrorResponse> {
+) -> Result<StatusCode, ErrorResponse> {
app.channels().delete(&channel, &deleted_at).await?;
Ok(StatusCode::ACCEPTED)
@@ -135,16 +137,66 @@ async fn on_delete(
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
-struct DeleteErrorResponse(#[from] DeleteError);
+struct ErrorResponse(#[from] app::Error);
-impl IntoResponse for DeleteErrorResponse {
+impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ DeleteError::NotFound(_) => {
+ not_found @ app::Error::NotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
}
}
}
+
+async fn messages(
+ State(app): State<App>,
+ Path(channel): Path<Id>,
+ _: Login,
+ Query(query): Query<ResumeQuery>,
+) -> Result<Messages, ErrorResponse> {
+ let messages = app
+ .channels()
+ .messages(&channel, query.resume_point)
+ .await?;
+ let response = Messages(
+ messages
+ .into_iter()
+ .map(|message| MessageView {
+ sent: message.sent,
+ sender: message.sender,
+ message: MessageInner {
+ id: message.id,
+ body: message.body,
+ },
+ })
+ .collect(),
+ );
+
+ Ok(response)
+}
+
+struct Messages(Vec<MessageView>);
+
+#[derive(serde::Serialize)]
+struct MessageView {
+ #[serde(flatten)]
+ sent: Instant,
+ sender: Login,
+ message: MessageInner,
+}
+
+#[derive(serde::Serialize)]
+struct MessageInner {
+ id: message::Id,
+ body: String,
+}
+
+impl IntoResponse for Messages {
+ fn into_response(self) -> Response {
+ let Self(messages) = self;
+ Json(messages).into_response()
+ }
+}
diff --git a/src/event/app.rs b/src/event/app.rs
index 32f0a97..d664ec7 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -36,7 +36,7 @@ impl<'a> Events<'a> {
let channel_events = channels
.iter()
.map(channel::History::events)
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .kmerge_by(Sequence::merge)
.filter(Sequence::after(resume_at))
.map(Event::from);
@@ -44,14 +44,12 @@ impl<'a> Events<'a> {
let message_events = messages
.iter()
.map(message::History::events)
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .kmerge_by(Sequence::merge)
.filter(Sequence::after(resume_at))
.map(Event::from);
let replay_events = channel_events
- .merge_by(message_events, |a, b| {
- a.instant.sequence < b.instant.sequence
- })
+ .merge_by(message_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map(Sequenced::sequence);
diff --git a/src/event/mod.rs b/src/event/mod.rs
index 1503b77..1349fe6 100644
--- a/src/event/mod.rs
+++ b/src/event/mod.rs
@@ -38,7 +38,7 @@ impl From<channel::Event> for Event {
impl From<message::Event> for Event {
fn from(event: message::Event) -> Self {
Self {
- instant: event.instant,
+ instant: event.instant(),
kind: event.kind.into(),
}
}
diff --git a/src/event/sequence.rs b/src/event/sequence.rs
index c566156..fbe3711 100644
--- a/src/event/sequence.rs
+++ b/src/event/sequence.rs
@@ -59,6 +59,13 @@ impl Sequence {
{
move |event| resume_point <= event.sequence()
}
+
+ pub fn merge<E>(a: &E, b: &E) -> bool
+ where
+ E: Sequenced,
+ {
+ a.sequence() < b.sequence()
+ }
}
pub trait Sequenced {
diff --git a/src/message/app.rs b/src/message/app.rs
index 1d34c14..fd6a334 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -7,7 +7,7 @@ use crate::{
channel::{self, repo::Provider as _},
clock::DateTime,
db::NotFound as _,
- event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
login::Login,
};
@@ -87,7 +87,7 @@ impl<'a> Messages<'a> {
self.events.broadcast(
events
.into_iter()
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .kmerge_by(Sequence::merge)
.map(Event::from)
.collect::<Vec<_>>(),
);
diff --git a/src/message/event.rs b/src/message/event.rs
index bcc2238..66db9b0 100644
--- a/src/message/event.rs
+++ b/src/message/event.rs
@@ -7,14 +7,12 @@ use crate::{
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Event {
#[serde(flatten)]
- pub instant: Instant,
- #[serde(flatten)]
pub kind: Kind,
}
impl Sequenced for Event {
fn instant(&self) -> Instant {
- self.instant
+ self.kind.instant()
}
}
@@ -25,12 +23,27 @@ pub enum Kind {
Deleted(Deleted),
}
+impl Sequenced for Kind {
+ fn instant(&self) -> Instant {
+ match self {
+ Self::Sent(sent) => sent.instant(),
+ Self::Deleted(deleted) => deleted.instant(),
+ }
+ }
+}
+
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Sent {
#[serde(flatten)]
pub message: Message,
}
+impl Sequenced for Sent {
+ fn instant(&self) -> Instant {
+ self.message.sent
+ }
+}
+
impl From<Sent> for Kind {
fn from(event: Sent) -> Self {
Self::Sent(event)
@@ -39,10 +52,18 @@ impl From<Sent> for Kind {
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Deleted {
+ #[serde(flatten)]
+ pub instant: Instant,
pub channel: Channel,
pub message: Id,
}
+impl Sequenced for Deleted {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
impl From<Deleted> for Kind {
fn from(event: Deleted) -> Self {
Self::Deleted(event)
diff --git a/src/message/history.rs b/src/message/history.rs
index 5aca47e..89fc6b1 100644
--- a/src/message/history.rs
+++ b/src/message/history.rs
@@ -7,14 +7,12 @@ use crate::event::Instant;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub message: Message,
- pub sent: Instant,
pub deleted: Option<Instant>,
}
impl History {
fn sent(&self) -> Event {
Event {
- instant: self.sent,
kind: Sent {
message: self.message.clone(),
}
@@ -24,8 +22,8 @@ impl History {
fn deleted(&self) -> Option<Event> {
self.deleted.map(|instant| Event {
- instant,
kind: Deleted {
+ instant,
channel: self.message.channel.clone(),
message: self.message.id.clone(),
}
diff --git a/src/message/repo.rs b/src/message/repo.rs
index ae41736..fc835c8 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -48,12 +48,12 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
+ sent: *sent,
channel: channel.clone(),
sender: sender.clone(),
id: row.id,
body: row.body,
},
- sent: *sent,
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -62,18 +62,51 @@ impl<'c> Messages<'c> {
Ok(message)
}
- pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> {
- let messages = sqlx::query_scalar!(
+ pub async fn in_channel(
+ &mut self,
+ channel: &Channel,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<History>, sqlx::Error> {
+ let messages = sqlx::query!(
r#"
select
- message.id as "id: Id"
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ sender.id as "sender_id: login::Id",
+ sender.name as "sender_name",
+ message.id as "id: Id",
+ message.body,
+ sent_at as "sent_at: DateTime",
+ sent_sequence as "sent_sequence: Sequence"
from message
join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
where channel.id = $1
+ and coalesce(message.sent_sequence <= $2, true)
order by message.sent_sequence
"#,
channel.id,
+ resume_at,
)
+ .map(|row| History {
+ message: Message {
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
+ channel: Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ },
+ sender: Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ id: row.id,
+ body: row.body,
+ },
+ deleted: None,
+ })
.fetch_all(&mut *self.0)
.await?;
@@ -101,6 +134,10 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
channel: Channel {
id: row.channel_id,
name: row.channel_name,
@@ -112,10 +149,6 @@ impl<'c> Messages<'c> {
id: row.id,
body: row.body,
},
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
deleted: None,
})
.fetch_one(&mut *self.0)
@@ -189,6 +222,10 @@ impl<'c> Messages<'c> {
)
.map(|row| History {
message: Message {
+ sent: Instant {
+ at: row.sent_at,
+ sequence: row.sent_sequence,
+ },
channel: Channel {
id: row.channel_id,
name: row.channel_name,
@@ -200,10 +237,6 @@ impl<'c> Messages<'c> {
id: row.id,
body: row.body,
},
- sent: Instant {
- at: row.sent_at,
- sequence: row.sent_sequence,
- },
deleted: None,
})
.fetch_all(&mut *self.0)
diff --git a/src/message/snapshot.rs b/src/message/snapshot.rs
index 3adccbe..522c1aa 100644
--- a/src/message/snapshot.rs
+++ b/src/message/snapshot.rs
@@ -2,11 +2,13 @@ use super::{
event::{Event, Kind, Sent},
Id,
};
-use crate::{channel::Channel, login::Login};
+use crate::{channel::Channel, event::Instant, login::Login};
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(into = "self::serialize::Message")]
pub struct Message {
+ #[serde(skip)]
+ pub sent: Instant,
pub channel: Channel,
pub sender: Login,
pub id: Id,