summaryrefslogtreecommitdiff
path: root/src/channel/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel/app.rs')
-rw-r--r--src/channel/app.rs129
1 files changed, 113 insertions, 16 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 70cda47..bb331ec 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,13 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use super::{repo::Provider as _, Channel, Id};
use crate::{
clock::DateTime,
- events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
- repo::channel::{Channel, Provider as _},
+ db::NotFound,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
+ message::{repo::Provider as _, Message},
};
pub struct Channels<'a> {
@@ -19,27 +22,108 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
+ let created = tx.sequence().next(created_at).await?;
let channel = tx
.channels()
- .create(name, created_at)
+ .create(name, &created)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
- Ok(channel)
+ Ok(channel.snapshot())
}
- pub async fn all(&self) -> Result<Vec<Channel>, InternalError> {
+ pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
let mut tx = self.db.begin().await?;
- let channels = tx.channels().all().await?;
+ let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
+ pub async fn messages(
+ &self,
+ channel: &Id,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Message>, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point)
+ .await?
+ .into_iter()
+ .filter_map(|message| {
+ message
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
+ Ok(messages)
+ }
+
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
+ let mut tx = self.db.begin().await?;
+
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().in_channel(&channel, None).await?;
+ for message in messages {
+ let message = message.snapshot();
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(&message.id, &deleted).await?;
+ events.extend(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let channel = tx.channels().delete(&channel.id, &deleted).await?;
+ events.extend(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -49,19 +133,24 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
- let sequence = tx.message_events().assign_sequence(&channel).await?;
- let event = tx
- .channels()
- .delete_expired(&channel, sequence, relative_to)
- .await?;
- events.push(event);
+ let deleted = tx.sequence().next(relative_to).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(Sequence::merge)
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
@@ -75,6 +164,14 @@ pub enum CreateError {
DatabaseError(#[from] sqlx::Error),
}
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {