From 410f4d740cfc3d9b5a53ac237667ed03b7f19381 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 24 Sep 2024 19:51:01 -0400 Subject: Use a vector of sequence numbers, not timestamps, to restart /api/events streams. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The timestamp-based approach had some formal problems. In particular, it assumed that time always went forwards, which isn't necessarily the case: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting the message. If it helps, imagine blocking on `tx.begin().await?` for a while. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to time U (after T), and disconnects. * The server resumes Alice's request and finishes it. * Bob reconnects, setting his Last-Event-Id header to timestamp U. In this scenario, Bob never sees Alice's message unless he starts over. It wasn't in the original stream, since it wasn't broadcast while Bob was subscribed, and it's not in the new stream, since Bob's resume point is after the timestamp on Alice's message. The new approach avoids this. Each message is assigned a _sequence number_ when it's stored. Bob can be sure that his stream included every event, since the resume point is identified by sequence number even if the server processes them out of chronological order: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to sequence Cfoo=N, and disconnects. * The server resumes Alice's request, assigns her message sequence M (after N), and finishes it. * Bob resumes his subscription at Cfoo=N. * Bob receives Alice's message at Cfoo=M. There's a natural mutual exclusion on sequence numbers, enforced by sqlite, which ensures that no two messages have the same sequence number. Since sqlite promises that transactions are serializable by default (and enforces this with a whole-DB write lock), we can be confident that sequence numbers are monotonic, as well. This scenario is, to put it mildly, contrived and unlikely - which is what motivated me to fix it. These kinds of bugs are fiendishly hard to identify, let alone reproduce or understand. I wonder how costly cloning a map is going to turn out to be… A note on database migrations: sqlite3 really, truly has no `alter table … alter column` statement. The only way to modify an existing column is to add the column to a new table. If `alter column` existed, I would create the new `sequence` column in `message` in a much less roundabout way. Fortunately, these migrations assume that they are being run _offline_, so operations like "replace the whole table" are reasonable. --- src/channel/app.rs | 57 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 16 deletions(-) (limited to 'src/channel/app.rs') diff --git a/src/channel/app.rs b/src/channel/app.rs index 3c92d76..e314792 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -77,16 +77,11 @@ impl<'a> Channels<'a> { &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&str>, + resume_at: Option, ) -> Result + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let resume_at = resume_at - .map(chrono::DateTime::parse_from_rfc3339) - .transpose()? - .map(|resume_at| resume_at.to_utc()); - let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -94,29 +89,59 @@ impl<'a> Channels<'a> { .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let live_messages = self - .broadcaster - .listen(&channel.id) - .filter(Self::skip_stale(resume_at.as_ref())) - .filter(Self::skip_expired(&expire_at)); + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.broadcaster.listen(&channel.id); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; + let resume_broadcast_at = stored_messages + .last() + .map(|message| message.sequence) + .or(resume_at); + + // This should always be the case, up to integer rollover, primarily + // because every message in stored_messages has a sequence not less + // than `resume_at`, or `resume_at` is None. We use the last message + // (if any) to decide when to resume the `live_messages` stream. + // + // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but + // this form captures more of the reasoning. + assert!( + (resume_at.is_none() && resume_broadcast_at.is_none()) + || (stored_messages.is_empty() && resume_at == resume_broadcast_at) + || resume_at < resume_broadcast_at + ); + + // no skip_expired or resume transforms for stored_messages, as it's + // constructed not to contain messages meeting either criterion. + // + // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; + // * resume is redundant with the resume_at argument to + // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); + let live_messages = live_messages + // Sure, it's temporally improbable that we'll ever skip a message + // that's 90 days old, but there's no reason not to be thorough. + .filter(Self::skip_expired(&expire_at)) + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // stored_messages. + .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } - fn skip_stale( - resume_at: Option<&DateTime>, + fn resume( + resume_at: Option, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { - let resume_at = resume_at.cloned(); + let resume_at = resume_at; move |msg| { future::ready(match resume_at { None => true, - Some(resume_at) => msg.sent_at > resume_at, + Some(resume_at) => msg.sequence > resume_at, }) } } -- cgit v1.2.3