diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/channel/app.rs | 57 | ||||
| -rw-r--r-- | src/events/repo/broadcast.rs | 52 | ||||
| -rw-r--r-- | src/events/routes.rs | 66 | ||||
| -rw-r--r-- | src/events/routes/test.rs | 197 | ||||
| -rw-r--r-- | src/header.rs | 47 | ||||
| -rw-r--r-- | src/id.rs | 13 | ||||
| -rw-r--r-- | src/repo/channel.rs | 13 |
7 files changed, 364 insertions, 81 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 3c92d76..e314792 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -77,16 +77,11 @@ impl<'a> Channels<'a> { &self, channel: &channel::Id, subscribed_at: &DateTime, - resume_at: Option<&str>, + resume_at: Option<broadcast::Sequence>, ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> { // Somewhat arbitrarily, expire after 90 days. let expire_at = subscribed_at.to_owned() - TimeDelta::days(90); - let resume_at = resume_at - .map(chrono::DateTime::parse_from_rfc3339) - .transpose()? - .map(|resume_at| resume_at.to_utc()); - let mut tx = self.db.begin().await?; let channel = tx .channels() @@ -94,29 +89,59 @@ impl<'a> Channels<'a> { .await .not_found(|| EventsError::ChannelNotFound(channel.clone()))?; - let live_messages = self - .broadcaster - .listen(&channel.id) - .filter(Self::skip_stale(resume_at.as_ref())) - .filter(Self::skip_expired(&expire_at)); + // Subscribe before retrieving, to catch messages broadcast while we're + // querying the DB. We'll prune out duplicates later. + let live_messages = self.broadcaster.listen(&channel.id); tx.broadcast().expire(&expire_at).await?; - let stored_messages = tx.broadcast().replay(&channel, resume_at.as_ref()).await?; + let stored_messages = tx.broadcast().replay(&channel, resume_at).await?; tx.commit().await?; + let resume_broadcast_at = stored_messages + .last() + .map(|message| message.sequence) + .or(resume_at); + + // This should always be the case, up to integer rollover, primarily + // because every message in stored_messages has a sequence not less + // than `resume_at`, or `resume_at` is None. We use the last message + // (if any) to decide when to resume the `live_messages` stream. + // + // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but + // this form captures more of the reasoning. + assert!( + (resume_at.is_none() && resume_broadcast_at.is_none()) + || (stored_messages.is_empty() && resume_at == resume_broadcast_at) + || resume_at < resume_broadcast_at + ); + + // no skip_expired or resume transforms for stored_messages, as it's + // constructed not to contain messages meeting either criterion. + // + // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call; + // * resume is redundant with the resume_at argument to + // `tx.broadcasts().replay(…)`. let stored_messages = stream::iter(stored_messages); + let live_messages = live_messages + // Sure, it's temporally improbable that we'll ever skip a message + // that's 90 days old, but there's no reason not to be thorough. + .filter(Self::skip_expired(&expire_at)) + // Filtering on the broadcast resume point filters out messages + // before resume_at, and filters out messages duplicated from + // stored_messages. + .filter(Self::resume(resume_broadcast_at)); Ok(stored_messages.chain(live_messages)) } - fn skip_stale( - resume_at: Option<&DateTime>, + fn resume( + resume_at: Option<broadcast::Sequence>, ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> { - let resume_at = resume_at.cloned(); + let resume_at = resume_at; move |msg| { future::ready(match resume_at { None => true, - Some(resume_at) => msg.sent_at > resume_at, + Some(resume_at) => msg.sequence > resume_at, }) } } diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs index bffe991..29dab55 100644 --- a/src/events/repo/broadcast.rs +++ b/src/events/repo/broadcast.rs @@ -24,6 +24,7 @@ pub struct Broadcast<'t>(&'t mut SqliteConnection); #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)] pub struct Message { pub id: message::Id, + pub sequence: Sequence, pub sender: Login, pub body: String, pub sent_at: DateTime, @@ -37,27 +38,32 @@ impl<'c> Broadcast<'c> { body: &str, sent_at: &DateTime, ) -> Result<Message, sqlx::Error> { + let sequence = self.next_sequence_for(channel).await?; + let id = message::Id::generate(); let message = sqlx::query!( r#" insert into message - (id, sender, channel, body, sent_at) - values ($1, $2, $3, $4, $5) + (id, channel, sequence, sender, body, sent_at) + values ($1, $2, $3, $4, $5, $6) returning id as "id: message::Id", + sequence as "sequence: Sequence", sender as "sender: login::Id", body, sent_at as "sent_at: DateTime" "#, id, - sender.id, channel.id, + sequence, + sender.id, body, sent_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: sender.clone(), body: row.body, sent_at: row.sent_at, @@ -68,6 +74,22 @@ impl<'c> Broadcast<'c> { Ok(message) } + async fn next_sequence_for(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> { + let Sequence(current) = sqlx::query_scalar!( + r#" + -- `max` never returns null, but sqlx can't detect that + select max(sequence) as "sequence!: Sequence" + from message + where channel = $1 + "#, + channel.id, + ) + .fetch_one(&mut *self.0) + .await?; + + Ok(Sequence(current + 1)) + } + pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> { sqlx::query!( r#" @@ -85,12 +107,13 @@ impl<'c> Broadcast<'c> { pub async fn replay( &mut self, channel: &Channel, - resume_at: Option<&DateTime>, + resume_at: Option<Sequence>, ) -> Result<Vec<Message>, sqlx::Error> { let messages = sqlx::query!( r#" select message.id as "id: message::Id", + sequence as "sequence: Sequence", login.id as "sender_id: login::Id", login.name as sender_name, message.body, @@ -98,14 +121,15 @@ impl<'c> Broadcast<'c> { from message join login on message.sender = login.id where channel = $1 - and coalesce(sent_at > $2, true) - order by sent_at asc + and coalesce(sequence > $2, true) + order by sequence asc "#, channel.id, resume_at, ) .map(|row| Message { id: row.id, + sequence: row.sequence, sender: Login { id: row.sender_id, name: row.sender_name, @@ -119,3 +143,19 @@ impl<'c> Broadcast<'c> { Ok(messages) } } + +#[derive( + Debug, + Eq, + Ord, + PartialEq, + PartialOrd, + Clone, + Copy, + serde::Serialize, + serde::Deserialize, + sqlx::Type, +)] +#[serde(transparent)] +#[sqlx(transparent)] +pub struct Sequence(i64); diff --git a/src/events/routes.rs b/src/events/routes.rs index a6bf5d9..7731680 100644 --- a/src/events/routes.rs +++ b/src/events/routes.rs @@ -1,3 +1,5 @@ +use std::collections::{BTreeMap, HashSet}; + use axum::{ extract::State, http::StatusCode, @@ -9,8 +11,10 @@ use axum::{ Router, }; use axum_extra::extract::Query; -use chrono::{self, format::SecondsFormat}; -use futures::stream::{self, Stream, StreamExt as _, TryStreamExt as _}; +use futures::{ + future, + stream::{self, Stream, StreamExt as _, TryStreamExt as _}, +}; use super::repo::broadcast; use crate::{ @@ -25,6 +29,15 @@ use crate::{ #[cfg(test)] mod test; +// For the purposes of event replay, an "event ID" is a vector of per-channel +// sequence numbers. Replay will start with messages whose sequence number in +// its channel is higher than the sequence in the event ID, or if the channel +// is not listed in the event ID, then at the beginning. +// +// Using a sorted map ensures that there is a canonical representation for +// each event ID. +type EventId = BTreeMap<channel::Id, broadcast::Sequence>; + pub fn router() -> Router<App> { Router::new().route("/api/events", get(events)) } @@ -32,22 +45,27 @@ pub fn router() -> Router<App> { #[derive(Clone, serde::Deserialize)] struct EventsQuery { #[serde(default, rename = "channel")] - channels: Vec<channel::Id>, + channels: HashSet<channel::Id>, } async fn events( State(app): State<App>, RequestedAt(now): RequestedAt, _: Login, // requires auth, but doesn't actually care who you are - last_event_id: Option<LastEventId>, + last_event_id: Option<LastEventId<EventId>>, Query(query): Query<EventsQuery>, -) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> { - let resume_at = last_event_id.as_deref(); +) -> Result<Events<impl Stream<Item = ReplayableEvent> + std::fmt::Debug>, ErrorResponse> { + let resume_at = last_event_id + .map(LastEventId::into_inner) + .unwrap_or_default(); let streams = stream::iter(query.channels) .then(|channel| { let app = app.clone(); + let resume_at = resume_at.clone(); async move { + let resume_at = resume_at.get(&channel).copied(); + let events = app .channels() .events(&channel, &now, resume_at) @@ -62,7 +80,18 @@ async fn events( // impl From would take more code; this is used once. .map_err(ErrorResponse)?; - let stream = stream::select_all(streams); + // We resume counting from the provided last-event-id mapping, rather than + // starting from scratch, so that the events in a resumed stream contain + // the full vector of channel IDs for their event IDs right off the bat, + // even before any events are actually delivered. + let stream = stream::select_all(streams).scan(resume_at, |sequences, event| { + let (channel, sequence) = event.event_id(); + sequences.insert(channel, sequence); + + let event = ReplayableEvent(sequences.clone(), event); + + future::ready(Some(event)) + }); Ok(Events(stream)) } @@ -72,7 +101,7 @@ struct Events<S>(S); impl<S> IntoResponse for Events<S> where - S: Stream<Item = ChannelEvent> + Send + 'static, + S: Stream<Item = ReplayableEvent> + Send + 'static, { fn into_response(self) -> Response { let Self(stream) = self; @@ -101,6 +130,9 @@ impl IntoResponse for ErrorResponse { } } +#[derive(Debug)] +struct ReplayableEvent(EventId, ChannelEvent); + #[derive(Debug, serde::Serialize)] struct ChannelEvent { channel: channel::Id, @@ -116,19 +148,21 @@ impl ChannelEvent { } } - fn event_id(&self) -> String { - self.message - .sent_at - .to_rfc3339_opts(SecondsFormat::AutoSi, /* use_z */ true) + fn event_id(&self) -> (channel::Id, broadcast::Sequence) { + (self.channel.clone(), self.message.sequence) } } -impl TryFrom<ChannelEvent> for sse::Event { +impl TryFrom<ReplayableEvent> for sse::Event { type Error = serde_json::Error; - fn try_from(value: ChannelEvent) -> Result<Self, Self::Error> { - let data = serde_json::to_string_pretty(&value)?; - let event = Self::default().id(value.event_id()).data(&data); + fn try_from(value: ReplayableEvent) -> Result<Self, Self::Error> { + let ReplayableEvent(id, data) = value; + + let id = serde_json::to_string(&id)?; + let data = serde_json::to_string_pretty(&data)?; + + let event = Self::default().id(id).data(data); Ok(event) } diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs index df2d5f6..131c751 100644 --- a/src/events/routes/test.rs +++ b/src/events/routes/test.rs @@ -22,7 +22,9 @@ async fn no_subscriptions() { // Call the endpoint let subscribed_at = fixtures::now(); - let query = routes::EventsQuery { channels: vec![] }; + let query = routes::EventsQuery { + channels: [].into(), + }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) .await @@ -47,7 +49,7 @@ async fn includes_historical_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -56,7 +58,7 @@ async fn includes_historical_message() { // Verify the structure of the response. - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -78,7 +80,7 @@ async fn includes_live_message() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(mut events) = routes::events( State(app.clone()), @@ -95,7 +97,7 @@ async fn includes_live_message() { let sender = fixtures::login::create(&app).await; let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await; - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -121,7 +123,7 @@ async fn excludes_other_channels() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![subscribed.id.clone()], + channels: [subscribed.id.clone()].into(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -130,7 +132,7 @@ async fn excludes_other_channels() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -186,9 +188,9 @@ async fn includes_multiple_channels() { .await; for (channel, message) in messages { - assert!(events - .iter() - .any(|event| { event.channel == channel.id && event.message == message })); + assert!(events.iter().any(|routes::ReplayableEvent(_, event)| { + event.channel == channel.id && event.message == message + })); } } @@ -204,7 +206,7 @@ async fn nonexitent_channel() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.clone()], + channels: [channel.clone()].into(), }; let routes::ErrorResponse(error) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -239,7 +241,7 @@ async fn sequential_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -248,11 +250,13 @@ async fn sequential_messages() { // Verify the structure of the response. - let mut events = events.filter(|event| future::ready(messages.contains(&event.message))); + let mut events = events.filter(|routes::ReplayableEvent(_, event)| { + future::ready(messages.contains(&event.message)) + }); // Verify delivery in order for message in &messages { - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await @@ -283,7 +287,7 @@ async fn resumes_from() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into(), }; let resume_at = { @@ -298,19 +302,20 @@ async fn resumes_from() { .await .expect("subscribed to a valid channel"); - let event = events.next().immediately().await.expect("delivered events"); + let routes::ReplayableEvent(id, event) = + events.next().immediately().await.expect("delivered events"); assert_eq!(channel.id, event.channel); assert_eq!(initial_message, event.message); - event.event_id() + id }; // Resume after disconnect - let resumed_at = fixtures::now(); + let reconnect_at = fixtures::now(); let routes::Events(resumed) = routes::events( State(app), - resumed_at, + reconnect_at, subscriber, Some(resume_at.into()), Query(query), @@ -327,12 +332,156 @@ async fn resumes_from() { .await; for message in later_messages { - assert!(events - .iter() - .any(|event| event.channel == channel.id && event.message == message)); + assert!(events.iter().any( + |routes::ReplayableEvent(_, event)| event.channel == channel.id + && event.message == message + )); } } +// This test verifies a real bug I hit developing the vector-of-sequences +// approach to resuming events. A small omission caused the event IDs in a +// resumed stream to _omit_ channels that were in the original stream until +// those channels also appeared in the resumed stream. +// +// Clients would see something like +// * In the original stream, Cfoo=5,Cbar=8 +// * In the resumed stream, Cfoo=6 (no Cbar sequence number) +// +// Disconnecting and reconnecting a second time, using event IDs from that +// initial period of the first resume attempt, would then cause the second +// resume attempt to restart all other channels from the beginning, and not +// from where the first disconnection happened. +// +// This is a real and valid behaviour for clients! +#[tokio::test] +async fn serial_resume() { + // Set up the environment + + let app = fixtures::scratch_app().await; + let sender = fixtures::login::create(&app).await; + let channel_a = fixtures::channel::create(&app).await; + let channel_b = fixtures::channel::create(&app).await; + + // Call the endpoint + + let subscriber = fixtures::login::create(&app).await; + let query = routes::EventsQuery { + channels: [channel_a.id.clone(), channel_b.id.clone()].into(), + }; + + let resume_at = { + let initial_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // First subscription + let subscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + subscribed_at, + subscriber.clone(), + None, + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(initial_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in initial_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect + let resume_at = { + let resume_messages = [ + // Note that channel_b does not appear here. The buggy behaviour + // would be masked if channel_b happened to send a new message + // into the resumed event stream. + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(resume_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + for message in resume_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + + let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty"); + + id.to_owned() + }; + + // Resume after disconnect a second time + { + // At this point, we can send on either channel and demonstrate the + // problem. The resume point should before both of these messages, but + // after _all_ prior messages. + let final_messages = [ + fixtures::message::send(&app, &sender, &channel_a, &fixtures::now()).await, + fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await, + ]; + + // Second subscription + let resubscribed_at = fixtures::now(); + let routes::Events(events) = routes::events( + State(app.clone()), + resubscribed_at, + subscriber.clone(), + Some(resume_at.into()), + Query(query.clone()), + ) + .await + .expect("subscribed to a valid channel"); + + let events = events + .take(final_messages.len()) + .collect::<Vec<_>>() + .immediately() + .await; + + // This set of messages, in particular, _should not_ include any prior + // messages from `initial_messages` or `resume_messages`. + for message in final_messages { + assert!(events + .iter() + .any(|routes::ReplayableEvent(_, event)| event.message == message)); + } + }; +} + #[tokio::test] async fn removes_expired_messages() { // Set up the environment @@ -348,7 +497,7 @@ async fn removes_expired_messages() { let subscriber = fixtures::login::create(&app).await; let subscribed_at = fixtures::now(); let query = routes::EventsQuery { - channels: vec![channel.id.clone()], + channels: [channel.id.clone()].into_iter().collect(), }; let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None, Query(query)) @@ -357,7 +506,7 @@ async fn removes_expired_messages() { // Verify the semantics - let event = events + let routes::ReplayableEvent(_, event) = events .next() .immediately() .await diff --git a/src/header.rs b/src/header.rs index 61cc561..683c1f9 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,16 +1,22 @@ +use std::ops::Deref; + use axum::{ extract::FromRequestParts, http::{request::Parts, HeaderName, HeaderValue}, }; use axum_extra::typed_header::TypedHeader; +use serde::{de::DeserializeOwned, Serialize}; /// A typed header. When used as a bare extractor, reads from the /// `Last-Event-Id` HTTP header. -pub struct LastEventId(pub String); +pub struct LastEventId<T>(pub T); static LAST_EVENT_ID: HeaderName = HeaderName::from_static("last-event-id"); -impl headers::Header for LastEventId { +impl<T> headers::Header for LastEventId<T> +where + T: Serialize + DeserializeOwned, +{ fn name() -> &'static HeaderName { &LAST_EVENT_ID } @@ -20,11 +26,9 @@ impl headers::Header for LastEventId { I: Iterator<Item = &'i HeaderValue>, { let value = values.next().ok_or_else(headers::Error::invalid)?; - if let Ok(value) = value.to_str() { - Ok(Self(value.into())) - } else { - Err(headers::Error::invalid()) - } + let value = value.to_str().map_err(|_| headers::Error::invalid())?; + let value = serde_json::from_str(value).map_err(|_| headers::Error::invalid())?; + Ok(Self(value)) } fn encode<E>(&self, values: &mut E) @@ -33,16 +37,18 @@ impl headers::Header for LastEventId { { let Self(value) = self; // Must panic or suppress; the trait provides no other options. - let value = HeaderValue::from_str(value).expect("LastEventId is a valid header value"); + let value = serde_json::to_string(value).expect("value can be encoded as JSON"); + let value = HeaderValue::from_str(&value).expect("LastEventId is a valid header value"); values.extend(std::iter::once(value)); } } #[async_trait::async_trait] -impl<S> FromRequestParts<S> for LastEventId +impl<S, T> FromRequestParts<S> for LastEventId<T> where S: Send + Sync, + T: Serialize + DeserializeOwned, { type Rejection = <TypedHeader<Self> as FromRequestParts<S>>::Rejection; @@ -57,17 +63,24 @@ where } } -impl From<String> for LastEventId { - fn from(header: String) -> Self { - Self(header) - } -} - -impl std::ops::Deref for LastEventId { - type Target = str; +impl<T> Deref for LastEventId<T> { + type Target = T; fn deref(&self) -> &Self::Target { let Self(header) = self; header } } + +impl<T> From<T> for LastEventId<T> { + fn from(value: T) -> Self { + Self(value) + } +} + +impl<T> LastEventId<T> { + pub fn into_inner(self) -> T { + let Self(value) = self; + value + } +} @@ -27,7 +27,18 @@ pub const ID_SIZE: usize = 15; // // By convention, the prefix should be UPPERCASE - note that the alphabet for this // is entirely lowercase. -#[derive(Clone, Debug, Hash, Eq, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[derive( + Clone, + Debug, + Hash, + Eq, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(String); diff --git a/src/repo/channel.rs b/src/repo/channel.rs index 8f089e8..da63b45 100644 --- a/src/repo/channel.rs +++ b/src/repo/channel.rs @@ -79,7 +79,18 @@ impl<'c> Channels<'c> { } /// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] +#[derive( + Clone, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + sqlx::Type, + serde::Deserialize, + serde::Serialize, +)] #[sqlx(transparent)] #[serde(transparent)] pub struct Id(BaseId); |
