summaryrefslogtreecommitdiff
path: root/src/events
diff options
context:
space:
mode:
Diffstat (limited to 'src/events')
-rw-r--r--src/events/app.rs11
-rw-r--r--src/events/expire.rs18
-rw-r--r--src/events/mod.rs1
-rw-r--r--src/events/repo/message.rs11
-rw-r--r--src/events/types.rs55
5 files changed, 58 insertions, 38 deletions
diff --git a/src/events/app.rs b/src/events/app.rs
index 03f3ee6..5162c67 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -64,9 +64,10 @@ impl<'a> Events<'a> {
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
+ let sequence = tx.message_events().assign_sequence(&channel).await?;
let event = tx
.message_events()
- .delete_expired(&channel, &message, relative_to)
+ .delete_expired(&channel, &message, sequence, relative_to)
.await?;
events.push(event);
}
@@ -134,9 +135,11 @@ impl<'a> Events<'a> {
Ok(created_events.chain(replay).chain(live_messages).scan(
resume_at,
|resume_point, event| {
- let channel = &event.channel.id;
- let sequence = event.sequence;
- resume_point.advance(channel, sequence);
+ let channel = &event.channel_id();
+ match event.data {
+ types::ChannelEventData::Deleted(_) => resume_point.forget(channel),
+ _ => resume_point.advance(channel, event.sequence),
+ }
let event = types::ResumableEvent(resume_point.clone(), event);
diff --git a/src/events/expire.rs b/src/events/expire.rs
deleted file mode 100644
index d92142d..0000000
--- a/src/events/expire.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-use axum::{
- extract::{Request, State},
- middleware::Next,
- response::Response,
-};
-
-use crate::{app::App, clock::RequestedAt, error::Internal};
-
-// Expires messages and channels before each request.
-pub async fn middleware(
- State(app): State<App>,
- RequestedAt(expired_at): RequestedAt,
- req: Request,
- next: Next,
-) -> Result<Response, Internal> {
- app.events().expire(&expired_at).await?;
- Ok(next.run(req).await)
-}
diff --git a/src/events/mod.rs b/src/events/mod.rs
index 86bc5e9..711ae64 100644
--- a/src/events/mod.rs
+++ b/src/events/mod.rs
@@ -1,6 +1,5 @@
pub mod app;
pub mod broadcaster;
-pub mod expire;
mod extract;
pub mod repo;
mod routes;
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index 32419d5..f8bae2b 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -56,8 +56,8 @@ impl<'c> Events<'c> {
.map(|row| types::ChannelEvent {
sequence: row.sequence,
at: row.sent_at,
- channel: channel.clone(),
data: types::MessageEvent {
+ channel: channel.clone(),
sender: sender.clone(),
message: Message {
id: row.id,
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
+ pub async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -92,10 +92,9 @@ impl<'c> Events<'c> {
&mut self,
channel: &Channel,
message: &message::Id,
+ sequence: Sequence,
deleted_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(channel).await?;
-
sqlx::query_scalar!(
r#"
delete from message
@@ -110,8 +109,8 @@ impl<'c> Events<'c> {
Ok(types::ChannelEvent {
sequence,
at: *deleted_at,
- channel: channel.clone(),
data: types::MessageDeletedEvent {
+ channel: channel.clone(),
message: message.clone(),
}
.into(),
@@ -178,8 +177,8 @@ impl<'c> Events<'c> {
.map(|row| types::ChannelEvent {
sequence: row.sequence,
at: row.sent_at,
- channel: channel.clone(),
data: types::MessageEvent {
+ channel: channel.clone(),
sender: login::Login {
id: row.sender_id,
name: row.sender_name,
diff --git a/src/events/types.rs b/src/events/types.rs
index 9a65207..966842d 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -56,6 +56,11 @@ impl ResumePoint {
elements.insert(channel.clone(), sequence);
}
+ pub fn forget(&mut self, channel: &channel::Id) {
+ let Self(elements) = self;
+ elements.remove(channel);
+ }
+
pub fn get(&self, channel: &channel::Id) -> Option<Sequence> {
let Self(elements) = self;
elements.get(channel).copied()
@@ -92,7 +97,6 @@ pub struct ChannelEvent {
#[serde(skip)]
pub sequence: Sequence,
pub at: DateTime,
- pub channel: Channel,
#[serde(flatten)]
pub data: ChannelEventData,
}
@@ -102,45 +106,78 @@ impl ChannelEvent {
Self {
at: channel.created_at,
sequence: Sequence::default(),
- channel,
- data: ChannelEventData::Created,
+ data: CreatedEvent { channel }.into(),
+ }
+ }
+
+ pub fn channel_id(&self) -> &channel::Id {
+ match &self.data {
+ ChannelEventData::Created(event) => &event.channel.id,
+ ChannelEventData::Message(event) => &event.channel.id,
+ ChannelEventData::MessageDeleted(event) => &event.channel.id,
+ ChannelEventData::Deleted(event) => &event.channel,
}
}
}
impl ResumeElement for ChannelEvent {
fn element(&self) -> (&channel::Id, Sequence) {
- (&self.channel.id, self.sequence)
+ (self.channel_id(), self.sequence)
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ChannelEventData {
- Created,
+ Created(CreatedEvent),
Message(MessageEvent),
MessageDeleted(MessageDeletedEvent),
+ Deleted(DeletedEvent),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct CreatedEvent {
+ pub channel: Channel,
+}
+
+impl From<CreatedEvent> for ChannelEventData {
+ fn from(event: CreatedEvent) -> Self {
+ Self::Created(event)
+ }
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct MessageEvent {
+ pub channel: Channel,
pub sender: Login,
pub message: message::Message,
}
impl From<MessageEvent> for ChannelEventData {
- fn from(message: MessageEvent) -> Self {
- Self::Message(message)
+ fn from(event: MessageEvent) -> Self {
+ Self::Message(event)
}
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct MessageDeletedEvent {
+ pub channel: Channel,
pub message: message::Id,
}
impl From<MessageDeletedEvent> for ChannelEventData {
- fn from(message: MessageDeletedEvent) -> Self {
- Self::MessageDeleted(message)
+ fn from(event: MessageDeletedEvent) -> Self {
+ Self::MessageDeleted(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct DeletedEvent {
+ pub channel: channel::Id,
+}
+
+impl From<DeletedEvent> for ChannelEventData {
+ fn from(event: DeletedEvent) -> Self {
+ Self::Deleted(event)
}
}