From 410f4d740cfc3d9b5a53ac237667ed03b7f19381 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 24 Sep 2024 19:51:01 -0400 Subject: Use a vector of sequence numbers, not timestamps, to restart /api/events streams. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The timestamp-based approach had some formal problems. In particular, it assumed that time always went forwards, which isn't necessarily the case: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting the message. If it helps, imagine blocking on `tx.begin().await?` for a while. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to time U (after T), and disconnects. * The server resumes Alice's request and finishes it. * Bob reconnects, setting his Last-Event-Id header to timestamp U. In this scenario, Bob never sees Alice's message unless he starts over. It wasn't in the original stream, since it wasn't broadcast while Bob was subscribed, and it's not in the new stream, since Bob's resume point is after the timestamp on Alice's message. The new approach avoids this. Each message is assigned a _sequence number_ when it's stored. Bob can be sure that his stream included every event, since the resume point is identified by sequence number even if the server processes them out of chronological order: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to sequence Cfoo=N, and disconnects. * The server resumes Alice's request, assigns her message sequence M (after N), and finishes it. * Bob resumes his subscription at Cfoo=N. * Bob receives Alice's message at Cfoo=M. There's a natural mutual exclusion on sequence numbers, enforced by sqlite, which ensures that no two messages have the same sequence number. Since sqlite promises that transactions are serializable by default (and enforces this with a whole-DB write lock), we can be confident that sequence numbers are monotonic, as well. This scenario is, to put it mildly, contrived and unlikely - which is what motivated me to fix it. These kinds of bugs are fiendishly hard to identify, let alone reproduce or understand. I wonder how costly cloning a map is going to turn out to be… A note on database migrations: sqlite3 really, truly has no `alter table … alter column` statement. The only way to modify an existing column is to add the column to a new table. If `alter column` existed, I would create the new `sequence` column in `message` in a much less roundabout way. Fortunately, these migrations assume that they are being run _offline_, so operations like "replace the whole table" are reasonable. --- src/events/repo/broadcast.rs | 52 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 6 deletions(-) (limited to 'src/events/repo/broadcast.rs') diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs index bffe991..29dab55 100644 --- a/src/events/repo/broadcast.rs +++ b/src/events/repo/broadcast.rs @@ -24,6 +24,7 @@ pub struct Broadcast<'t>(&'t mut SqliteConnection); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { pub id: message::Id, + pub sequence: Sequence, pub sender: Login, pub body: String, pub sent_at: DateTime, @@ -37,27 +38,32 @@ impl<'c> Broadcast<'c> { body: &str, sent_at: &DateTime, ) -> Result { + let sequence = self.next_sequence_for(channel).await?; + let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message - (id, sender, channel, body, sent_at) - values ($1, $2, $3, $4, $5) + (id, channel, sequence, sender, body, sent_at) + values ($1, $2, $3, $4, $5, $6) returning id as "id: message::Id", + sequence as "sequence: Sequence", sender as "sender: login::Id", body, sent_at as "sent_at: DateTime" "#, id, - sender.id, channel.id, + sequence, + sender.id, body, sent_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: sender.clone(), body: row.body, sent_at: row.sent_at, @@ -68,6 +74,22 @@ impl<'c> Broadcast<'c> { Ok(message) } + async fn next_sequence_for(&mut self, channel: &Channel) -> Result { + let Sequence(current) = sqlx::query_scalar!( + r#" + -- `max` never returns null, but sqlx can't detect that + select max(sequence) as "sequence!: Sequence" + from message + where channel = $1 + "#, + channel.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(Sequence(current + 1)) + } + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { sqlx::query!( r#" @@ -85,12 +107,13 @@ impl<'c> Broadcast<'c> { pub async fn replay( &mut self, channel: &Channel, - resume_at: Option<&DateTime>, + resume_at: Option, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.id as "id: message::Id", + sequence as "sequence: Sequence", login.id as "sender_id: login::Id", login.name as sender_name, message.body, @@ -98,14 +121,15 @@ impl<'c> Broadcast<'c> { from message join login on message.sender = login.id where channel = $1 - and coalesce(sent_at > $2, true) - order by sent_at asc + and coalesce(sequence > $2, true) + order by sequence asc "#, channel.id, resume_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: Login { id: row.sender_id, name: row.sender_name, @@ -119,3 +143,19 @@ impl<'c> Broadcast<'c> { Ok(messages) } } + +#[derive( + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); -- cgit v1.2.3