summaryrefslogtreecommitdiff
path: root/src/events/repo
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-27 18:17:02 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-27 19:59:22 -0400
commiteff129bc1f29bcb1b2b9d10c6b49ab886edc83d6 (patch)
treeb82892a6cf40f771998a85e5530012bab80157dc /src/events/repo
parent68e3dce3c2e588376c6510783e908941360ac80e (diff)
Make `/api/events` a firehose endpoint.
It now includes events for all channels. Clients are responsible for filtering. The schema for channel events has changed; it now includes a channel name and ID, in the same format as the sender's name and ID. They also now include a `"type"` field, whose only valid value (as of this writing) is `"message"`. This is groundwork for delivering message deletion (expiry) events to clients, and notifying clients of channel lifecycle events.
Diffstat (limited to 'src/events/repo')
-rw-r--r--src/events/repo/message.rs (renamed from src/events/repo/broadcast.rs)83
-rw-r--r--src/events/repo/mod.rs2
2 files changed, 34 insertions, 51 deletions
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/message.rs
index 6914573..b4724ea 100644
--- a/src/events/repo/broadcast.rs
+++ b/src/events/repo/message.rs
@@ -2,6 +2,7 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
clock::DateTime,
+ events::types::{self, Sequence},
repo::{
channel::Channel,
login::{self, Login},
@@ -10,35 +11,25 @@ use crate::{
};
pub trait Provider {
- fn broadcast(&mut self) -> Broadcast;
+ fn message_events(&mut self) -> Events;
}
impl<'c> Provider for Transaction<'c, Sqlite> {
- fn broadcast(&mut self) -> Broadcast {
- Broadcast(self)
+ fn message_events(&mut self) -> Events {
+ Events(self)
}
}
-pub struct Broadcast<'t>(&'t mut SqliteConnection);
+pub struct Events<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
-pub struct Message {
- pub id: message::Id,
- #[serde(skip)]
- pub sequence: Sequence,
- pub sender: Login,
- pub body: String,
- pub sent_at: DateTime,
-}
-
-impl<'c> Broadcast<'c> {
+impl<'c> Events<'c> {
pub async fn create(
&mut self,
sender: &Login,
channel: &Channel,
body: &str,
sent_at: &DateTime,
- ) -> Result<Message, sqlx::Error> {
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
let sequence = self.next_sequence_for(channel).await?;
let id = message::Id::generate();
@@ -62,12 +53,16 @@ impl<'c> Broadcast<'c> {
body,
sent_at,
)
- .map(|row| Message {
- id: row.id,
+ .map(|row| types::ChannelEvent {
sequence: row.sequence,
- sender: sender.clone(),
- body: row.body,
- sent_at: row.sent_at,
+ at: row.sent_at,
+ channel: channel.clone(),
+ data: types::MessageEvent {
+ id: row.id,
+ sender: sender.clone(),
+ body: row.body,
+ }
+ .into(),
})
.fetch_one(&mut *self.0)
.await?;
@@ -76,7 +71,7 @@ impl<'c> Broadcast<'c> {
}
async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
- let Sequence(current) = sqlx::query_scalar!(
+ let current = sqlx::query_scalar!(
r#"
-- `max` never returns null, but sqlx can't detect that
select max(sequence) as "sequence!: Sequence"
@@ -88,7 +83,7 @@ impl<'c> Broadcast<'c> {
.fetch_one(&mut *self.0)
.await?;
- Ok(Sequence(current + 1))
+ Ok(current.next())
}
pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
@@ -109,8 +104,8 @@ impl<'c> Broadcast<'c> {
&mut self,
channel: &Channel,
resume_at: Option<Sequence>,
- ) -> Result<Vec<Message>, sqlx::Error> {
- let messages = sqlx::query!(
+ ) -> Result<Vec<types::ChannelEvent>, sqlx::Error> {
+ let events = sqlx::query!(
r#"
select
message.id as "id: message::Id",
@@ -128,35 +123,23 @@ impl<'c> Broadcast<'c> {
channel.id,
resume_at,
)
- .map(|row| Message {
- id: row.id,
+ .map(|row| types::ChannelEvent {
sequence: row.sequence,
- sender: Login {
- id: row.sender_id,
- name: row.sender_name,
- },
- body: row.body,
- sent_at: row.sent_at,
+ at: row.sent_at,
+ channel: channel.clone(),
+ data: types::MessageEvent {
+ id: row.id,
+ sender: login::Login {
+ id: row.sender_id,
+ name: row.sender_name,
+ },
+ body: row.body,
+ }
+ .into(),
})
.fetch_all(&mut *self.0)
.await?;
- Ok(messages)
+ Ok(events)
}
}
-
-#[derive(
- Debug,
- Eq,
- Ord,
- PartialEq,
- PartialOrd,
- Clone,
- Copy,
- serde::Serialize,
- serde::Deserialize,
- sqlx::Type,
-)]
-#[serde(transparent)]
-#[sqlx(transparent)]
-pub struct Sequence(i64);
diff --git a/src/events/repo/mod.rs b/src/events/repo/mod.rs
index 2ed3062..e216a50 100644
--- a/src/events/repo/mod.rs
+++ b/src/events/repo/mod.rs
@@ -1 +1 @@
-pub mod broadcast;
+pub mod message;