summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-03 20:44:07 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 21:03:02 -0400
commit617172576b95bbb935a75f98a98787da5a4e9a9d (patch)
treeae72fea2e81d023960c93d4efbf7e137c3705c48 /src/channel/app.rs
parent0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (diff)
List messages per channel.
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs44
1 files changed, 37 insertions, 7 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 24be2ff..b3bfbee 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -7,7 +7,7 @@ use crate::{
clock::DateTime,
db::NotFound,
event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
- message::repo::Provider as _,
+ message::{repo::Provider as _, Message},
};
pub struct Channels<'a> {
@@ -54,22 +54,52 @@ impl<'a> Channels<'a> {
Ok(channels)
}
- pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ pub async fn messages(
+ &self,
+ channel: &Id,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Message>, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point)
+ .await?
+ .into_iter()
+ .filter_map(|message| {
+ message
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
+ Ok(messages)
+ }
+
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
- .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .not_found(|| Error::NotFound(channel.clone()))?
.snapshot();
let mut events = Vec::new();
- let messages = tx.messages().in_channel(&channel).await?;
+ let messages = tx.messages().in_channel(&channel, None).await?;
for message in messages {
+ let message = message.snapshot();
let deleted = tx.sequence().next(deleted_at).await?;
- let message = tx.messages().delete(&message, &deleted).await?;
+ let message = tx.messages().delete(&message.id, &deleted).await?;
events.extend(
message
.events()
@@ -117,7 +147,7 @@ impl<'a> Channels<'a> {
self.events.broadcast(
events
.into_iter()
- .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .kmerge_by(Sequence::merge)
.map(Event::from)
.collect::<Vec<_>>(),
);
@@ -135,7 +165,7 @@ pub enum CreateError {
}
#[derive(Debug, thiserror::Error)]
-pub enum DeleteError {
+pub enum Error {
#[error("channel {0} not found")]
NotFound(Id),
#[error(transparent)]