From 410f4d740cfc3d9b5a53ac237667ed03b7f19381 Mon Sep 17 00:00:00 2001 From: Owen Jacobson Date: Tue, 24 Sep 2024 19:51:01 -0400 Subject: Use a vector of sequence numbers, not timestamps, to restart /api/events streams. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The timestamp-based approach had some formal problems. In particular, it assumed that time always went forwards, which isn't necessarily the case: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting the message. If it helps, imagine blocking on `tx.begin().await?` for a while. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to time U (after T), and disconnects. * The server resumes Alice's request and finishes it. * Bob reconnects, setting his Last-Event-Id header to timestamp U. In this scenario, Bob never sees Alice's message unless he starts over. It wasn't in the original stream, since it wasn't broadcast while Bob was subscribed, and it's not in the new stream, since Bob's resume point is after the timestamp on Alice's message. The new approach avoids this. Each message is assigned a _sequence number_ when it's stored. Bob can be sure that his stream included every event, since the resume point is identified by sequence number even if the server processes them out of chronological order: * Alice calls `/api/channels/Cfoo` to send a message. * The server assigns time T to the request. * The server stalls somewhere in send() for a while, before storing and broadcasting. * In this interval, Bob calls `/api/events?channel=Cfoo`, receives historical messages up to sequence Cfoo=N, and disconnects. * The server resumes Alice's request, assigns her message sequence M (after N), and finishes it. * Bob resumes his subscription at Cfoo=N. * Bob receives Alice's message at Cfoo=M. There's a natural mutual exclusion on sequence numbers, enforced by sqlite, which ensures that no two messages have the same sequence number. Since sqlite promises that transactions are serializable by default (and enforces this with a whole-DB write lock), we can be confident that sequence numbers are monotonic, as well. This scenario is, to put it mildly, contrived and unlikely - which is what motivated me to fix it. These kinds of bugs are fiendishly hard to identify, let alone reproduce or understand. I wonder how costly cloning a map is going to turn out to be… A note on database migrations: sqlite3 really, truly has no `alter table … alter column` statement. The only way to modify an existing column is to add the column to a new table. If `alter column` existed, I would create the new `sequence` column in `message` in a much less roundabout way. Fortunately, these migrations assume that they are being run _offline_, so operations like "replace the whole table" are reasonable. --- ...8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf.json | 44 +++++ ...edfa884d8a00f692eba656d3daa8119011b703cfcd.json | 20 +++ ...8cb132479c6e7a2301d576af298da570f3effdc106.json | 50 ++++++ ...3be4c1f4499b6caf8d556050fca8036326f904e36e.json | 38 ---- ...02d475d33061ffc50b6bd65ac9a950ad6d715ba0bc.json | 44 ----- docs/api.md | 12 +- .../20240924232919_message_serial_numbers.sql | 38 ++++ src/channel/app.rs | 57 ++++-- src/events/repo/broadcast.rs | 52 +++++- src/events/routes.rs | 66 +++++-- src/events/routes/test.rs | 197 ++++++++++++++++++--- src/header.rs | 47 +++-- src/id.rs | 13 +- src/repo/channel.rs | 13 +- 14 files changed, 521 insertions(+), 170 deletions(-) create mode 100644 .sqlx/query-2310fe5b8e88e314eb200d8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf.json create mode 100644 .sqlx/query-54fe04ade0a01cfd0f3ddbedfa884d8a00f692eba656d3daa8119011b703cfcd.json create mode 100644 .sqlx/query-760d3532e1613fd9f79ac98cb132479c6e7a2301d576af298da570f3effdc106.json delete mode 100644 .sqlx/query-aeb970faead693939834df3be4c1f4499b6caf8d556050fca8036326f904e36e.json delete mode 100644 .sqlx/query-f29a6accbc6b4cca6a3b5702d475d33061ffc50b6bd65ac9a950ad6d715ba0bc.json create mode 100644 migrations/20240924232919_message_serial_numbers.sql diff --git a/.sqlx/query-2310fe5b8e88e314eb200d8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf.json b/.sqlx/query-2310fe5b8e88e314eb200d8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf.json new file mode 100644 index 0000000..1bd4116 --- /dev/null +++ b/.sqlx/query-2310fe5b8e88e314eb200d8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, channel, sequence, sender, body, sent_at)\n\t\t\t\tvalues ($1, $2, $3, $4, $5, $6)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: message::Id\",\n sequence as \"sequence: Sequence\",\n\t\t\t\t\tsender as \"sender: login::Id\",\n\t\t\t\t\tbody,\n\t\t\t\t\tsent_at as \"sent_at: DateTime\"\n\t\t\t", + "describe": { + "columns": [ + { + "name": "id: message::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sequence: Sequence", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "sender: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 4, + "type_info": "Text" + } + ], + "parameters": { + "Right": 6 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "2310fe5b8e88e314eb200d8f227b09c3e4b0c9c0202c7cbe3fba93213ea100cf" +} diff --git a/.sqlx/query-54fe04ade0a01cfd0f3ddbedfa884d8a00f692eba656d3daa8119011b703cfcd.json b/.sqlx/query-54fe04ade0a01cfd0f3ddbedfa884d8a00f692eba656d3daa8119011b703cfcd.json new file mode 100644 index 0000000..a739207 --- /dev/null +++ b/.sqlx/query-54fe04ade0a01cfd0f3ddbedfa884d8a00f692eba656d3daa8119011b703cfcd.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n -- `max` never returns null, but sqlx can't detect that\n select max(sequence) as \"sequence!: Sequence\"\n from message\n where channel = $1\n ", + "describe": { + "columns": [ + { + "name": "sequence!: Sequence", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true + ] + }, + "hash": "54fe04ade0a01cfd0f3ddbedfa884d8a00f692eba656d3daa8119011b703cfcd" +} diff --git a/.sqlx/query-760d3532e1613fd9f79ac98cb132479c6e7a2301d576af298da570f3effdc106.json b/.sqlx/query-760d3532e1613fd9f79ac98cb132479c6e7a2301d576af298da570f3effdc106.json new file mode 100644 index 0000000..beb9234 --- /dev/null +++ b/.sqlx/query-760d3532e1613fd9f79ac98cb132479c6e7a2301d576af298da570f3effdc106.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\t\tselect\n\t\t\t\t\tmessage.id as \"id: message::Id\",\n sequence as \"sequence: Sequence\",\n\t\t\t\t\tlogin.id as \"sender_id: login::Id\",\n\t\t\t\t\tlogin.name as sender_name,\n\t\t\t\t\tmessage.body,\n\t\t\t\t\tmessage.sent_at as \"sent_at: DateTime\"\n\t\t\t\tfrom message\n\t\t\t\t\tjoin login on message.sender = login.id\n\t\t\t\twhere channel = $1\n\t\t\t\t\tand coalesce(sequence > $2, true)\n\t\t\t\torder by sequence asc\n\t\t\t", + "describe": { + "columns": [ + { + "name": "id: message::Id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "sequence: Sequence", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "sender_id: login::Id", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "sender_name", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "body", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent_at: DateTime", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "760d3532e1613fd9f79ac98cb132479c6e7a2301d576af298da570f3effdc106" +} diff --git a/.sqlx/query-aeb970faead693939834df3be4c1f4499b6caf8d556050fca8036326f904e36e.json b/.sqlx/query-aeb970faead693939834df3be4c1f4499b6caf8d556050fca8036326f904e36e.json deleted file mode 100644 index 9234a9b..0000000 --- a/.sqlx/query-aeb970faead693939834df3be4c1f4499b6caf8d556050fca8036326f904e36e.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n\t\t\t\tinsert into message\n\t\t\t\t\t(id, sender, channel, body, sent_at)\n\t\t\t\tvalues ($1, $2, $3, $4, $5)\n\t\t\t\treturning\n\t\t\t\t\tid as \"id: message::Id\",\n\t\t\t\t\tsender as \"sender: login::Id\",\n\t\t\t\t\tbody,\n\t\t\t\t\tsent_at as \"sent_at: DateTime\"\n\t\t\t", - "describe": { - "columns": [ - { - "name": "id: message::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 3, - "type_info": "Text" - } - ], - "parameters": { - "Right": 5 - }, - "nullable": [ - false, - false, - false, - false - ] - }, - "hash": "aeb970faead693939834df3be4c1f4499b6caf8d556050fca8036326f904e36e" -} diff --git a/.sqlx/query-f29a6accbc6b4cca6a3b5702d475d33061ffc50b6bd65ac9a950ad6d715ba0bc.json b/.sqlx/query-f29a6accbc6b4cca6a3b5702d475d33061ffc50b6bd65ac9a950ad6d715ba0bc.json deleted file mode 100644 index e2b6d74..0000000 --- a/.sqlx/query-f29a6accbc6b4cca6a3b5702d475d33061ffc50b6bd65ac9a950ad6d715ba0bc.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n\t\t\t\tselect\n\t\t\t\t\tmessage.id as \"id: message::Id\",\n\t\t\t\t\tlogin.id as \"sender_id: login::Id\",\n\t\t\t\t\tlogin.name as sender_name,\n\t\t\t\t\tmessage.body,\n\t\t\t\t\tmessage.sent_at as \"sent_at: DateTime\"\n\t\t\t\tfrom message\n\t\t\t\t\tjoin login on message.sender = login.id\n\t\t\t\twhere channel = $1\n\t\t\t\t\tand coalesce(sent_at > $2, true)\n\t\t\t\torder by sent_at asc\n\t\t\t", - "describe": { - "columns": [ - { - "name": "id: message::Id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "sender_id: login::Id", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "sender_name", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "body", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "sent_at: DateTime", - "ordinal": 4, - "type_info": "Text" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - false, - false, - false, - false, - false - ] - }, - "hash": "f29a6accbc6b4cca6a3b5702d475d33061ffc50b6bd65ac9a950ad6d715ba0bc" -} diff --git a/docs/api.md b/docs/api.md index 969e939..8bb3c0b 100644 --- a/docs/api.md +++ b/docs/api.md @@ -156,20 +156,22 @@ The returned stream may terminate, to limit the number of outstanding messages h This endpoint accepts the following query parameters: -* `channel`: a channel ID. Events for this channel will be included in the response. This parameter may be provided multiple times. +* `channel`: a channel ID to subscribe to. Events for this channel will be included in the response. This parameter may be provided multiple times. Clients should not subscribe to the same channel more than once in a single request. Browsers generally limit the number of open connections, often to embarrassingly low limits. Clients should subscribe to multiple streams in a single request, and should not subscribe to each stream individually. -Requests without parameters will be successful, but will return an empty stream. +Requests without a subscription return an empty stream. (If you're wondering: it has to be query parameters or something equivalent to it, since `EventSource` can only issue `GET` requests.) #### Request headers -This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` of the last event processed by the client. The new stream will resume immediately after that event. If this header is omitted, then the stream will start from the beginning. +This endpoint accepts an optional `Last-Event-Id` header for resuming an interrupted stream. If this header is provided, it must be set to the `id` field sent with the last event the client has processed. When `Last-Event-Id` is sent, the response will resume immediately after the corresponding event. If this header is omitted, then the stream will start from the beginning. If you're using a browser's `EventSource` API, this is handled for you automatically. +The event IDs `hi` sends in `application/event-stream` encoding are ephemeral, and can only be reused within the brief intervals required to reconnect to the event stream. Do not store them, and do not parse them. The message data's `"id"` field is the durable identifier for each message. + #### On success The returned event stream is a sequence of events: @@ -187,7 +189,3 @@ data: "body": "my amazing thoughts, by bob", data: "sent_at": "2024-09-19T02:30:50.915462Z" data: } ``` - -The event `id` (`1234`, in the example above) is used to support resuming the stream after an interruption. See the "Request headers" section, above, for details. Event IDs are ephemeral, and can only be reused within the brief intervals required to reconnect to the event stream. Do not store them, and do not parse them. - -The `"id"` field uniquely identifies the message in related API requests, but is not used to resume the stream. diff --git a/migrations/20240924232919_message_serial_numbers.sql b/migrations/20240924232919_message_serial_numbers.sql new file mode 100644 index 0000000..a53e4a2 --- /dev/null +++ b/migrations/20240924232919_message_serial_numbers.sql @@ -0,0 +1,38 @@ +create table sequenced_message ( + id text + not null + primary key, + sequence bigint + not null, + channel text + not null + references channel (id), + sender text + not null + references login (id), + body text + not null, + sent_at text + not null, + unique (channel, sequence) +); + +insert into sequenced_message +select + id, + rank() over ( + partition by channel + order by sent_at + ) as sequence, + channel, + sender, + body, + sent_at +from message; + +drop table message; + +alter table sequenced_message +rename to message; + +create index message_sent_at on message (channel, sent_at); diff --git a/src/channel/app.rs b/src/channel/app.rs index 3c92d76..e314792 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -77,16 +77,11 @@ impl<'a> Channels<'a> { &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&str>, + resume_at: Option, ) -> Result + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let resume_at = resume_at - .map(chrono::DateTime::parse_from_rfc3339) - .transpose()? - .map(|resume_at| resume_at.to_utc()); - let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -94,29 +89,59 @@ impl<'a> Channels<'a> { .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let live_messages = self - .broadcaster - .listen(&channel.id) - .filter(Self::skip_stale(resume_at.as_ref())) - .filter(Self::skip_expired(&expire_at)); + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.broadcaster.listen(&channel.id); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; + let resume_broadcast_at = stored_messages + .last() + .map(|message| message.sequence) + .or(resume_at); + + // This should always be the case, up to integer rollover, primarily + // because every message in stored_messages has a sequence not less + // than `resume_at`, or `resume_at` is None. We use the last message + // (if any) to decide when to resume the `live_messages` stream. + // + // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but + // this form captures more of the reasoning. + assert!( + (resume_at.is_none() && resume_broadcast_at.is_none()) + || (stored_messages.is_empty() && resume_at == resume_broadcast_at) + || resume_at < resume_broadcast_at + ); + + // no skip_expired or resume transforms for stored_messages, as it's + // constructed not to contain messages meeting either criterion. + // + // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; + // * resume is redundant with the resume_at argument to + // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); + let live_messages = live_messages + // Sure, it's temporally improbable that we'll ever skip a message + // that's 90 days old, but there's no reason not to be thorough. + .filter(Self::skip_expired(&expire_at)) + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // stored_messages. + .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } - fn skip_stale( - resume_at: Option<&DateTime>, + fn resume( + resume_at: Option, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready { - let resume_at = resume_at.cloned(); + let resume_at = resume_at; move |msg| { future::ready(match resume_at { None => true, - Some(resume_at) => msg.sent_at > resume_at, + Some(resume_at) => msg.sequence > resume_at, }) } } diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs index bffe991..29dab55 100644 --- a/src/events/repo/broadcast.rs +++ b/src/events/repo/broadcast.rs @@ -24,6 +24,7 @@ pub struct Broadcast<'t>(&'t mut SqliteConnection); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { pub id: message::Id, + pub sequence: Sequence, pub sender: Login, pub body: String, pub sent_at: DateTime, @@ -37,27 +38,32 @@ impl<'c> Broadcast<'c> { body: &str, sent_at: &DateTime, ) -> Result { + let sequence = self.next_sequence_for(channel).await?; + let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message - (id, sender, channel, body, sent_at) - values ($1, $2, $3, $4, $5) + (id, channel, sequence, sender, body, sent_at) + values ($1, $2, $3, $4, $5, $6) returning id as "id: message::Id", + sequence as "sequence: Sequence", sender as "sender: login::Id", body, sent_at as "sent_at: DateTime" "#, id, - sender.id, channel.id, + sequence, + sender.id, body, sent_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: sender.clone(), body: row.body, sent_at: row.sent_at, @@ -68,6 +74,22 @@ impl<'c> Broadcast<'c> { Ok(message) } + async fn next_sequence_for(&mut self, channel: &Channel) -> Result { + let Sequence(current) = sqlx::query_scalar!( + r#" + -- `max` never returns null, but sqlx can't detect that + select max(sequence) as "sequence!: Sequence" + from message + where channel = $1 + "#, + channel.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(Sequence(current + 1)) + } + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { sqlx::query!( r#" @@ -85,12 +107,13 @@ impl<'c> Broadcast<'c> { pub async fn replay( &mut self, channel: &Channel, - resume_at: Option<&DateTime>, + resume_at: Option, ) -> Result, sqlx::Error> { let messages = sqlx::query!( r#" select message.id as "id: message::Id", + sequence as "sequence: Sequence", login.id as "sender_id: login::Id", login.name as sender_name, message.body, @@ -98,14 +121,15 @@ impl<'c> Broadcast<'c> { from message join login on message.sender = login.id where channel = $1 - and coalesce(sent_at > $2, true) - order by sent_at asc + and coalesce(sequence > $2, true) + order by sequence asc "#, channel.id, resume_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: Login { id: row.sender_id, name: row.sender_name, @@ -119,3 +143,19 @@ impl<'c> Broadcast<'c> { Ok(messages) } } + +#[derive( + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); diff --git a/src/events/routes.rs b/src/events/routes.rs index a6bf5d9..7731680 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -1,3 +1,5 @@ +use std::collections::{BTreeMap, HashSet}; + use axum::{ extract::State, http::StatusCode, @@ -9,8 +11,10 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat}; -use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; +use futures::{ + future, + stream::{self, Stream, StreamExt as _, TryStreamExt as _}, +}; use super::repo::broadcast; use crate::{ @@ -25,6 +29,15 @@ use crate::{ #[cfg(test)] mod test; +// For the purposes of event replay, an "event ID" is a vector of per-channel +// sequence numbers. Replay will start with messages whose sequence number in +// its channel is higher than the sequence in the event ID, or if the channel +// is not listed in the event ID, then at the beginning. +// +// Using a sorted map ensures that there is a canonical representation for +// each event ID. +type EventId = BTreeMap; + pub fn router() -> Router { Router::new().route("/api/events", get(events)) } @@ -32,22 +45,27 @@ pub fn router() -> Router { #[derive(Clone, serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] - channels: Vec, + channels: HashSet, } async fn events( State(app): State, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option, + last_event_id: Option>, Query(query): Query, -) -> Result + std::fmt::Debug>, ErrorResponse> { - let resume_at = last_event_id.as_deref(); +) -> Result + std::fmt::Debug>, ErrorResponse> { + let resume_at = last_event_id + .map(LastEventId::into_inner) + .unwrap_or_default(); let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); + let resume_at = resume_at.clone(); async move { + let resume_at = resume_at.get(&channel).copied(); + let events = app .channels() .events(&channel, &now, resume_at) @@ -62,7 +80,18 @@ async fn events( // impl From would take more code; this is used once. .map_err(ErrorResponse)?; - let stream = stream::select_all(streams); + // We resume counting from the provided last-event-id mapping, rather than + // starting from scratch, so that the events in a resumed stream contain + // the full vector of channel IDs for their event IDs right off the bat, + // even before any events are actually delivered. + let stream = stream::select_all(streams).scan(resume_at, |sequences, event| { + let (channel, sequence) = event.event_id(); + sequences.insert(channel, sequence); + + let event = ReplayableEvent(sequences.clone(), event); + + future::ready(Some(event)) + }); Ok(Events(stream)) } @@ -72,7 +101,7 @@ struct Events(S); impl IntoResponse for Events where - S: Stream + Send + 'static, + S: Stream + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -101,6 +130,9 @@ impl IntoResponse for ErrorResponse { } } +#[derive(Debug)] +struct ReplayableEvent(EventId, ChannelEvent); + #[derive(Debug, serde::Serialize)] struct ChannelEvent { channel: channel::Id, @@ -116,19 +148,21 @@ impl ChannelEvent { } } - fn event_id(&self) -> String { - self.message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + fn event_id(&self) -> (channel::Id, broadcast::Sequence) { + (self.channel.clone(), self.message.sequence) } } -impl TryFrom for sse::Event { +impl TryFrom for sse::Event { type Error = serde_json::Error; - fn try_from(value: ChannelEvent) -> Result { - let data = serde_json::to_string_pretty(&value)?; - let event = Self::default().id(value.event_id()).data(&data); + fn try_from(value: ReplayableEvent) -> Result { + let ReplayableEvent(id, data) = value; + + let id = serde_json::to_string(&id)?; + let data = serde_json::to_string_pretty(&data)?; + + let event = Self::default().id(id).data(data); Ok(event) } diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index df2d5f6..131c751 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -22,7 +22,9 @@ async fn no_subscriptions() { // Call the endpoint let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { channels: vec![] }; + let query = routes::EventsQuery { + channels: [].into(), + }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await @@ -47,7 +49,7 @@ async fn includes_historical_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -56,7 +58,7 @@ async fn includes_historical_message() { // Verify the structure of the response. - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -78,7 +80,7 @@ async fn includes_live_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events( State(app.clone()), @@ -95,7 +97,7 @@ async fn includes_live_message() { let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -121,7 +123,7 @@ async fn excludes_other_channels() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![subscribed.id.clone()], + channels: [subscribed.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -130,7 +132,7 @@ async fn excludes_other_channels() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -186,9 +188,9 @@ async fn includes_multiple_channels() { .await; for (channel, message) in messages { - assert!(events - .iter() - .any(|event| { event.channel == channel.id && event.message == message })); + assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { + event.channel == channel.id && event.message == message + })); } } @@ -204,7 +206,7 @@ async fn nonexitent_channel() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.clone()], + channels: [channel.clone()].into(), }; let routes::ErrorResponse(error) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -239,7 +241,7 @@ async fn sequential_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -248,11 +250,13 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); + let mut events = events.filter(|routes::ReplayableEvent(_, event)| { + future::ready(messages.contains(&event.message)) + }); // Verify delivery in order for message in &messages { - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -283,7 +287,7 @@ async fn resumes_from() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let resume_at = { @@ -298,19 +302,20 @@ async fn resumes_from() { .await .expect("subscribed to a valid channel"); - let event = events.next().immediately().await.expect("delivered events"); + let routes::ReplayableEvent(id, event) = + events.next().immediately().await.expect("delivered events"); assert_eq!(channel.id, event.channel); assert_eq!(initial_message, event.message); - event.event_id() + id }; // Resume after disconnect - let resumed_at = fixtures::now(); + let reconnect_at = fixtures::now(); let routes::Events(resumed) = routes::events( State(app), - resumed_at, + reconnect_at, subscriber, Some(resume_at.into()), Query(query), @@ -327,12 +332,156 @@ async fn resumes_from() { .await; for message in later_messages { - assert!(events - .iter() - .any(|event| event.channel == channel.id && event.message == message)); + assert!(events.iter().any( + |routes::ReplayableEvent(_, event)| event.channel == channel.id + && event.message == message + )); } } +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// This is a real and valid behaviour for clients! +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel_a = fixtures::channel::create(&app).await; + let channel_b = fixtures::channel::create(&app).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let query = routes::EventsQuery { + channels: [channel_a.id.clone(), channel_b.id.clone()].into(), + }; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // First subscription + let subscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber.clone(), + None, + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(initial_messages.len()) + .collect::>() + .immediately() + .await; + + for message in initial_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(resume_messages.len()) + .collect::>() + .immediately() + .await; + + for message in resume_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(final_messages.len()) + .collect::>() + .immediately() + .await; + + // This set of messages, in particular, _should not_ include any prior + // messages from `initial_messages` or `resume_messages`. + for message in final_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + }; +} + #[tokio::test] async fn removes_expired_messages() { // Set up the environment @@ -348,7 +497,7 @@ async fn removes_expired_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into_iter().collect(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -357,7 +506,7 @@ async fn removes_expired_messages() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await diff --git a/src/header.rs b/src/header.rs index 61cc561..683c1f9 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,16 +1,22 @@ +use std::ops::Deref; + use axum::{ extract::FromRequestParts, http::{request::Parts, HeaderName, HeaderValue}, }; use axum_extra::typed_header::TypedHeader; +use serde::{de::DeserializeOwned, Serialize}; /// A typed header. When used as a bare extractor, reads from the /// `Last-Event-Id` HTTP header. -pub struct LastEventId(pub String); +pub struct LastEventId(pub T); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); -impl headers::Header for LastEventId { +impl headers::Header for LastEventId +where + T: Serialize + DeserializeOwned, +{ fn name() -> &'static HeaderName { &LAST_EVENT_ID } @@ -20,11 +26,9 @@ impl headers::Header for LastEventId { I: Iterator, { let value = values.next().ok_or_else(headers::Error::invalid)?; - if let Ok(value) = value.to_str() { - Ok(Self(value.into())) - } else { - Err(headers::Error::invalid()) - } + let value = value.to_str().map_err(|_| headers::Error::invalid())?; + let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?; + Ok(Self(value)) } fn encode(&self, values: &mut E) @@ -33,16 +37,18 @@ impl headers::Header for LastEventId { { let Self(value) = self; // Must panic or suppress; the trait provides no other options. - let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + let value = serde_json::to_string(value).expect("value can be encoded as JSON"); + let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value"); values.extend(std::iter::once(value)); } } #[async_trait::async_trait] -impl FromRequestParts for LastEventId +impl FromRequestParts for LastEventId where S: Send + Sync, + T: Serialize + DeserializeOwned, { type Rejection = as FromRequestParts>::Rejection; @@ -57,17 +63,24 @@ where } } -impl From for LastEventId { - fn from(header: String) -> Self { - Self(header) - } -} - -impl std::ops::Deref for LastEventId { - type Target = str; +impl Deref for LastEventId { + type Target = T; fn deref(&self) -> &Self::Target { let Self(header) = self; header } } + +impl From for LastEventId { + fn from(value: T) -> Self { + Self(value) + } +} + +impl LastEventId { + pub fn into_inner(self) -> T { + let Self(value) = self; + value + } +} diff --git a/src/id.rs b/src/id.rs index 22add08..2fb5e0e 100644 --- a/src/id.rs +++ b/src/id.rs @@ -27,7 +27,18 @@ pub const ID_SIZE: usize = 15; // // By convention, the prefix should be UPPERCASE - note that the alphabet for this // is entirely lowercase. -#[derive(Clone, Debug, Hash, Eq, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[derive( + Clone, + Debug, + Hash, + Eq, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(String); diff --git a/src/repo/channel.rs b/src/repo/channel.rs index 8f089e8..da63b45 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -79,7 +79,18 @@ impl<'c> Channels<'c> { } /// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(BaseId); -- cgit v1.2.3