summaryrefslogtreecommitdiff
path: root/src/events/repo/broadcast.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/events/repo/broadcast.rs')
-rw-r--r--src/events/repo/broadcast.rs52
1 files changed, 46 insertions, 6 deletions
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs
index bffe991..29dab55 100644
--- a/src/events/repo/broadcast.rs
+++ b/src/events/repo/broadcast.rs
@@ -24,6 +24,7 @@ pub struct Broadcast<'t>(&'t mut SqliteConnection);
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Message {
pub id: message::Id,
+ pub sequence: Sequence,
pub sender: Login,
pub body: String,
pub sent_at: DateTime,
@@ -37,27 +38,32 @@ impl<'c> Broadcast<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<Message, sqlx::Error> {
+ let sequence = self.next_sequence_for(channel).await?;
+
let id = message::Id::generate();
let message = sqlx::query!(
r#"
insert into message
- (id, sender, channel, body, sent_at)
- values ($1, $2, $3, $4, $5)
+ (id, channel, sequence, sender, body, sent_at)
+ values ($1, $2, $3, $4, $5, $6)
returning
id as "id: message::Id",
+ sequence as "sequence: Sequence",
sender as "sender: login::Id",
body,
sent_at as "sent_at: DateTime"
"#,
id,
- sender.id,
channel.id,
+ sequence,
+ sender.id,
body,
sent_at,
)
.map(|row| Message {
id: row.id,
+ sequence: row.sequence,
sender: sender.clone(),
body: row.body,
sent_at: row.sent_at,
@@ -68,6 +74,22 @@ impl<'c> Broadcast<'c> {
Ok(message)
}
+ async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
+ let Sequence(current) = sqlx::query_scalar!(
+ r#"
+ -- `max` never returns null, but sqlx can't detect that
+ select max(sequence) as "sequence!: Sequence"
+ from message
+ where channel = $1
+ "#,
+ channel.id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(Sequence(current + 1))
+ }
+
pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
@@ -85,12 +107,13 @@ impl<'c> Broadcast<'c> {
pub async fn replay(
&mut self,
channel: &Channel,
- resume_at: Option<&DateTime>,
+ resume_at: Option<Sequence>,
) -> Result<Vec<Message>, sqlx::Error> {
let messages = sqlx::query!(
r#"
select
message.id as "id: message::Id",
+ sequence as "sequence: Sequence",
login.id as "sender_id: login::Id",
login.name as sender_name,
message.body,
@@ -98,14 +121,15 @@ impl<'c> Broadcast<'c> {
from message
join login on message.sender = login.id
where channel = $1
- and coalesce(sent_at > $2, true)
- order by sent_at asc
+ and coalesce(sequence > $2, true)
+ order by sequence asc
"#,
channel.id,
resume_at,
)
.map(|row| Message {
id: row.id,
+ sequence: row.sequence,
sender: Login {
id: row.sender_id,
name: row.sender_name,
@@ -119,3 +143,19 @@ impl<'c> Broadcast<'c> {
Ok(messages)
}
}
+
+#[derive(
+ Debug,
+ Eq,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ Clone,
+ Copy,
+ serde::Serialize,
+ serde::Deserialize,
+ sqlx::Type,
+)]
+#[serde(transparent)]
+#[sqlx(transparent)]
+pub struct Sequence(i64);