summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs57
-rw-r--r--src/events/repo/broadcast.rs52
-rw-r--r--src/events/routes.rs66
-rw-r--r--src/events/routes/test.rs197
-rw-r--r--src/header.rs47
-rw-r--r--src/id.rs13
-rw-r--r--src/repo/channel.rs13
7 files changed, 364 insertions, 81 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 3c92d76..e314792 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -77,16 +77,11 @@ impl<'a> Channels<'a> {
&self,
channel: &channel::Id,
subscribed_at: &DateTime,
- resume_at: Option<&str>,
+ resume_at: Option<broadcast::Sequence>,
) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
- let resume_at = resume_at
- .map(chrono::DateTime::parse_from_rfc3339)
- .transpose()?
- .map(|resume_at| resume_at.to_utc());
-
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
@@ -94,29 +89,59 @@ impl<'a> Channels<'a> {
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
- let live_messages = self
- .broadcaster
- .listen(&channel.id)
- .filter(Self::skip_stale(resume_at.as_ref()))
- .filter(Self::skip_expired(&expire_at));
+ // Subscribe before retrieving, to catch messages broadcast while we're
+ // querying the DB. We'll prune out duplicates later.
+ let live_messages = self.broadcaster.listen(&channel.id);
tx.broadcast().expire(&expire_at).await?;
- let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?;
+ let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
+ let resume_broadcast_at = stored_messages
+ .last()
+ .map(|message| message.sequence)
+ .or(resume_at);
+
+ // This should always be the case, up to integer rollover, primarily
+ // because every message in stored_messages has a sequence not less
+ // than `resume_at`, or `resume_at` is None. We use the last message
+ // (if any) to decide when to resume the `live_messages` stream.
+ //
+ // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but
+ // this form captures more of the reasoning.
+ assert!(
+ (resume_at.is_none() && resume_broadcast_at.is_none())
+ || (stored_messages.is_empty() && resume_at == resume_broadcast_at)
+ || resume_at < resume_broadcast_at
+ );
+
+ // no skip_expired or resume transforms for stored_messages, as it's
+ // constructed not to contain messages meeting either criterion.
+ //
+ // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
+ // * resume is redundant with the resume_at argument to
+ // `tx.broadcasts().replay(…)`.
let stored_messages = stream::iter(stored_messages);
+ let live_messages = live_messages
+ // Sure, it's temporally improbable that we'll ever skip a message
+ // that's 90 days old, but there's no reason not to be thorough.
+ .filter(Self::skip_expired(&expire_at))
+ // Filtering on the broadcast resume point filters out messages
+ // before resume_at, and filters out messages duplicated from
+ // stored_messages.
+ .filter(Self::resume(resume_broadcast_at));
Ok(stored_messages.chain(live_messages))
}
- fn skip_stale(
- resume_at: Option<&DateTime>,
+ fn resume(
+ resume_at: Option<broadcast::Sequence>,
) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
- let resume_at = resume_at.cloned();
+ let resume_at = resume_at;
move |msg| {
future::ready(match resume_at {
None => true,
- Some(resume_at) => msg.sent_at > resume_at,
+ Some(resume_at) => msg.sequence > resume_at,
})
}
}
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs
index bffe991..29dab55 100644
--- a/src/events/repo/broadcast.rs
+++ b/src/events/repo/broadcast.rs
@@ -24,6 +24,7 @@ pub struct Broadcast<'t>(&'t mut SqliteConnection);
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Message {
pub id: message::Id,
+ pub sequence: Sequence,
pub sender: Login,
pub body: String,
pub sent_at: DateTime,
@@ -37,27 +38,32 @@ impl<'c> Broadcast<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<Message, sqlx::Error> {
+ let sequence = self.next_sequence_for(channel).await?;
+
let id = message::Id::generate();
let message = sqlx::query!(
r#"
insert into message
- (id, sender, channel, body, sent_at)
- values ($1, $2, $3, $4, $5)
+ (id, channel, sequence, sender, body, sent_at)
+ values ($1, $2, $3, $4, $5, $6)
returning
id as "id: message::Id",
+ sequence as "sequence: Sequence",
sender as "sender: login::Id",
body,
sent_at as "sent_at: DateTime"
"#,
id,
- sender.id,
channel.id,
+ sequence,
+ sender.id,
body,
sent_at,
)
.map(|row| Message {
id: row.id,
+ sequence: row.sequence,
sender: sender.clone(),
body: row.body,
sent_at: row.sent_at,
@@ -68,6 +74,22 @@ impl<'c> Broadcast<'c> {
Ok(message)
}
+ async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
+ let Sequence(current) = sqlx::query_scalar!(
+ r#"
+ -- `max` never returns null, but sqlx can't detect that
+ select max(sequence) as "sequence!: Sequence"
+ from message
+ where channel = $1
+ "#,
+ channel.id,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(Sequence(current + 1))
+ }
+
pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
@@ -85,12 +107,13 @@ impl<'c> Broadcast<'c> {
pub async fn replay(
&mut self,
channel: &Channel,
- resume_at: Option<&DateTime>,
+ resume_at: Option<Sequence>,
) -> Result<Vec<Message>, sqlx::Error> {
let messages = sqlx::query!(
r#"
select
message.id as "id: message::Id",
+ sequence as "sequence: Sequence",
login.id as "sender_id: login::Id",
login.name as sender_name,
message.body,
@@ -98,14 +121,15 @@ impl<'c> Broadcast<'c> {
from message
join login on message.sender = login.id
where channel = $1
- and coalesce(sent_at > $2, true)
- order by sent_at asc
+ and coalesce(sequence > $2, true)
+ order by sequence asc
"#,
channel.id,
resume_at,
)
.map(|row| Message {
id: row.id,
+ sequence: row.sequence,
sender: Login {
id: row.sender_id,
name: row.sender_name,
@@ -119,3 +143,19 @@ impl<'c> Broadcast<'c> {
Ok(messages)
}
}
+
+#[derive(
+ Debug,
+ Eq,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ Clone,
+ Copy,
+ serde::Serialize,
+ serde::Deserialize,
+ sqlx::Type,
+)]
+#[serde(transparent)]
+#[sqlx(transparent)]
+pub struct Sequence(i64);
diff --git a/src/events/routes.rs b/src/events/routes.rs
index a6bf5d9..7731680 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -1,3 +1,5 @@
+use std::collections::{BTreeMap, HashSet};
+
use axum::{
extract::State,
http::StatusCode,
@@ -9,8 +11,10 @@ use axum::{
Router,
};
use axum_extra::extract::Query;
-use chrono::{self, format::SecondsFormat};
-use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _};
+use futures::{
+ future,
+ stream::{self, Stream, StreamExt as _, TryStreamExt as _},
+};
use super::repo::broadcast;
use crate::{
@@ -25,6 +29,15 @@ use crate::{
#[cfg(test)]
mod test;
+// For the purposes of event replay, an "event ID" is a vector of per-channel
+// sequence numbers. Replay will start with messages whose sequence number in
+// its channel is higher than the sequence in the event ID, or if the channel
+// is not listed in the event ID, then at the beginning.
+//
+// Using a sorted map ensures that there is a canonical representation for
+// each event ID.
+type EventId = BTreeMap<channel::Id, broadcast::Sequence>;
+
pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
@@ -32,22 +45,27 @@ pub fn router() -> Router<App> {
#[derive(Clone, serde::Deserialize)]
struct EventsQuery {
#[serde(default, rename = "channel")]
- channels: Vec<channel::Id>,
+ channels: HashSet<channel::Id>,
}
async fn events(
State(app): State<App>,
RequestedAt(now): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
- last_event_id: Option<LastEventId>,
+ last_event_id: Option<LastEventId<EventId>>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> {
- let resume_at = last_event_id.as_deref();
+) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> {
+ let resume_at = last_event_id
+ .map(LastEventId::into_inner)
+ .unwrap_or_default();
let streams = stream::iter(query.channels)
.then(|channel| {
let app = app.clone();
+ let resume_at = resume_at.clone();
async move {
+ let resume_at = resume_at.get(&channel).copied();
+
let events = app
.channels()
.events(&channel, &now, resume_at)
@@ -62,7 +80,18 @@ async fn events(
// impl From would take more code; this is used once.
.map_err(ErrorResponse)?;
- let stream = stream::select_all(streams);
+ // We resume counting from the provided last-event-id mapping, rather than
+ // starting from scratch, so that the events in a resumed stream contain
+ // the full vector of channel IDs for their event IDs right off the bat,
+ // even before any events are actually delivered.
+ let stream = stream::select_all(streams).scan(resume_at, |sequences, event| {
+ let (channel, sequence) = event.event_id();
+ sequences.insert(channel, sequence);
+
+ let event = ReplayableEvent(sequences.clone(), event);
+
+ future::ready(Some(event))
+ });
Ok(Events(stream))
}
@@ -72,7 +101,7 @@ struct Events<S>(S);
impl<S> IntoResponse for Events<S>
where
- S: Stream<Item = ChannelEvent> + Send + 'static,
+ S: Stream<Item = ReplayableEvent> + Send + 'static,
{
fn into_response(self) -> Response {
let Self(stream) = self;
@@ -101,6 +130,9 @@ impl IntoResponse for ErrorResponse {
}
}
+#[derive(Debug)]
+struct ReplayableEvent(EventId, ChannelEvent);
+
#[derive(Debug, serde::Serialize)]
struct ChannelEvent {
channel: channel::Id,
@@ -116,19 +148,21 @@ impl ChannelEvent {
}
}
- fn event_id(&self) -> String {
- self.message
- .sent_at
- .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true)
+ fn event_id(&self) -> (channel::Id, broadcast::Sequence) {
+ (self.channel.clone(), self.message.sequence)
}
}
-impl TryFrom<ChannelEvent> for sse::Event {
+impl TryFrom<ReplayableEvent> for sse::Event {
type Error = serde_json::Error;
- fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> {
- let data = serde_json::to_string_pretty(&value)?;
- let event = Self::default().id(value.event_id()).data(&data);
+ fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> {
+ let ReplayableEvent(id, data) = value;
+
+ let id = serde_json::to_string(&id)?;
+ let data = serde_json::to_string_pretty(&data)?;
+
+ let event = Self::default().id(id).data(data);
Ok(event)
}
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index df2d5f6..131c751 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -22,7 +22,9 @@ async fn no_subscriptions() {
// Call the endpoint
let subscribed_at = fixtures::now();
- let query = routes::EventsQuery { channels: vec![] };
+ let query = routes::EventsQuery {
+ channels: [].into(),
+ };
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
.await
@@ -47,7 +49,7 @@ async fn includes_historical_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -56,7 +58,7 @@ async fn includes_historical_message() {
// Verify the structure of the response.
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -78,7 +80,7 @@ async fn includes_live_message() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(mut events) = routes::events(
State(app.clone()),
@@ -95,7 +97,7 @@ async fn includes_live_message() {
let sender = fixtures::login::create(&app).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -121,7 +123,7 @@ async fn excludes_other_channels() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![subscribed.id.clone()],
+ channels: [subscribed.id.clone()].into(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -130,7 +132,7 @@ async fn excludes_other_channels() {
// Verify the semantics
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -186,9 +188,9 @@ async fn includes_multiple_channels() {
.await;
for (channel, message) in messages {
- assert!(events
- .iter()
- .any(|event| { event.channel == channel.id && event.message == message }));
+ assert!(events.iter().any(|routes::ReplayableEvent(_, event)| {
+ event.channel == channel.id && event.message == message
+ }));
}
}
@@ -204,7 +206,7 @@ async fn nonexitent_channel() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.clone()],
+ channels: [channel.clone()].into(),
};
let routes::ErrorResponse(error) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -239,7 +241,7 @@ async fn sequential_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let routes::Events(events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -248,11 +250,13 @@ async fn sequential_messages() {
// Verify the structure of the response.
- let mut events = events.filter(|event| future::ready(messages.contains(&event.message)));
+ let mut events = events.filter(|routes::ReplayableEvent(_, event)| {
+ future::ready(messages.contains(&event.message))
+ });
// Verify delivery in order
for message in &messages {
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
@@ -283,7 +287,7 @@ async fn resumes_from() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into(),
};
let resume_at = {
@@ -298,19 +302,20 @@ async fn resumes_from() {
.await
.expect("subscribed to a valid channel");
- let event = events.next().immediately().await.expect("delivered events");
+ let routes::ReplayableEvent(id, event) =
+ events.next().immediately().await.expect("delivered events");
assert_eq!(channel.id, event.channel);
assert_eq!(initial_message, event.message);
- event.event_id()
+ id
};
// Resume after disconnect
- let resumed_at = fixtures::now();
+ let reconnect_at = fixtures::now();
let routes::Events(resumed) = routes::events(
State(app),
- resumed_at,
+ reconnect_at,
subscriber,
Some(resume_at.into()),
Query(query),
@@ -327,12 +332,156 @@ async fn resumes_from() {
.await;
for message in later_messages {
- assert!(events
- .iter()
- .any(|event| event.channel == channel.id && event.message == message));
+ assert!(events.iter().any(
+ |routes::ReplayableEvent(_, event)| event.channel == channel.id
+ && event.message == message
+ ));
}
}
+// This test verifies a real bug I hit developing the vector-of-sequences
+// approach to resuming events. A small omission caused the event IDs in a
+// resumed stream to _omit_ channels that were in the original stream until
+// those channels also appeared in the resumed stream.
+//
+// Clients would see something like
+// * In the original stream, Cfoo=5,Cbar=8
+// * In the resumed stream, Cfoo=6 (no Cbar sequence number)
+//
+// Disconnecting and reconnecting a second time, using event IDs from that
+// initial period of the first resume attempt, would then cause the second
+// resume attempt to restart all other channels from the beginning, and not
+// from where the first disconnection happened.
+//
+// This is a real and valid behaviour for clients!
+#[tokio::test]
+async fn serial_resume() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel_a = fixtures::channel::create(&app).await;
+ let channel_b = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let query = routes::EventsQuery {
+ channels: [channel_a.id.clone(), channel_b.id.clone()].into(),
+ };
+
+ let resume_at = {
+ let initial_messages = [
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ ];
+
+ // First subscription
+ let subscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber.clone(),
+ None,
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(initial_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in initial_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+
+ let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+
+ id.to_owned()
+ };
+
+ // Resume after disconnect
+ let resume_at = {
+ let resume_messages = [
+ // Note that channel_b does not appear here. The buggy behaviour
+ // would be masked if channel_b happened to send a new message
+ // into the resumed event stream.
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let resubscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ resubscribed_at,
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(resume_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in resume_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+
+ let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+
+ id.to_owned()
+ };
+
+ // Resume after disconnect a second time
+ {
+ // At this point, we can send on either channel and demonstrate the
+ // problem. The resume point should before both of these messages, but
+ // after _all_ prior messages.
+ let final_messages = [
+ fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
+ ];
+
+ // Second subscription
+ let resubscribed_at = fixtures::now();
+ let routes::Events(events) = routes::events(
+ State(app.clone()),
+ resubscribed_at,
+ subscriber.clone(),
+ Some(resume_at.into()),
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let events = events
+ .take(final_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ // This set of messages, in particular, _should not_ include any prior
+ // messages from `initial_messages` or `resume_messages`.
+ for message in final_messages {
+ assert!(events
+ .iter()
+ .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ }
+ };
+}
+
#[tokio::test]
async fn removes_expired_messages() {
// Set up the environment
@@ -348,7 +497,7 @@ async fn removes_expired_messages() {
let subscriber = fixtures::login::create(&app).await;
let subscribed_at = fixtures::now();
let query = routes::EventsQuery {
- channels: vec![channel.id.clone()],
+ channels: [channel.id.clone()].into_iter().collect(),
};
let routes::Events(mut events) =
routes::events(State(app), subscribed_at, subscriber, None, Query(query))
@@ -357,7 +506,7 @@ async fn removes_expired_messages() {
// Verify the semantics
- let event = events
+ let routes::ReplayableEvent(_, event) = events
.next()
.immediately()
.await
diff --git a/src/header.rs b/src/header.rs
index 61cc561..683c1f9 100644
--- a/src/header.rs
+++ b/src/header.rs
@@ -1,16 +1,22 @@
+use std::ops::Deref;
+
use axum::{
extract::FromRequestParts,
http::{request::Parts, HeaderName, HeaderValue},
};
use axum_extra::typed_header::TypedHeader;
+use serde::{de::DeserializeOwned, Serialize};
/// A typed header. When used as a bare extractor, reads from the
/// `Last-Event-Id` HTTP header.
-pub struct LastEventId(pub String);
+pub struct LastEventId<T>(pub T);
static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id");
-impl headers::Header for LastEventId {
+impl<T> headers::Header for LastEventId<T>
+where
+ T: Serialize + DeserializeOwned,
+{
fn name() -> &'static HeaderName {
&LAST_EVENT_ID
}
@@ -20,11 +26,9 @@ impl headers::Header for LastEventId {
I: Iterator<Item = &'i HeaderValue>,
{
let value = values.next().ok_or_else(headers::Error::invalid)?;
- if let Ok(value) = value.to_str() {
- Ok(Self(value.into()))
- } else {
- Err(headers::Error::invalid())
- }
+ let value = value.to_str().map_err(|_| headers::Error::invalid())?;
+ let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?;
+ Ok(Self(value))
}
fn encode<E>(&self, values: &mut E)
@@ -33,16 +37,18 @@ impl headers::Header for LastEventId {
{
let Self(value) = self;
// Must panic or suppress; the trait provides no other options.
- let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value");
+ let value = serde_json::to_string(value).expect("value can be encoded as JSON");
+ let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value");
values.extend(std::iter::once(value));
}
}
#[async_trait::async_trait]
-impl<S> FromRequestParts<S> for LastEventId
+impl<S, T> FromRequestParts<S> for LastEventId<T>
where
S: Send + Sync,
+ T: Serialize + DeserializeOwned,
{
type Rejection = <TypedHeader<Self> as FromRequestParts<S>>::Rejection;
@@ -57,17 +63,24 @@ where
}
}
-impl From<String> for LastEventId {
- fn from(header: String) -> Self {
- Self(header)
- }
-}
-
-impl std::ops::Deref for LastEventId {
- type Target = str;
+impl<T> Deref for LastEventId<T> {
+ type Target = T;
fn deref(&self) -> &Self::Target {
let Self(header) = self;
header
}
}
+
+impl<T> From<T> for LastEventId<T> {
+ fn from(value: T) -> Self {
+ Self(value)
+ }
+}
+
+impl<T> LastEventId<T> {
+ pub fn into_inner(self) -> T {
+ let Self(value) = self;
+ value
+ }
+}
diff --git a/src/id.rs b/src/id.rs
index 22add08..2fb5e0e 100644
--- a/src/id.rs
+++ b/src/id.rs
@@ -27,7 +27,18 @@ pub const ID_SIZE: usize = 15;
//
// By convention, the prefix should be UPPERCASE - note that the alphabet for this
// is entirely lowercase.
-#[derive(Clone, Debug, Hash, Eq, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)]
+#[derive(
+ Clone,
+ Debug,
+ Hash,
+ Eq,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ sqlx::Type,
+ serde::Deserialize,
+ serde::Serialize,
+)]
#[sqlx(transparent)]
#[serde(transparent)]
pub struct Id(String);
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 8f089e8..da63b45 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -79,7 +79,18 @@ impl<'c> Channels<'c> {
}
/// Stable identifier for a [Channel]. Prefixed with `C`.
-#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)]
+#[derive(
+ Clone,
+ Debug,
+ Eq,
+ Hash,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ sqlx::Type,
+ serde::Deserialize,
+ serde::Serialize,
+)]
#[sqlx(transparent)]
#[serde(transparent)]
pub struct Id(BaseId);