diff options
| author | ojacobson <ojacobson@noreply.codeberg.org> | 2025-08-27 06:10:29 +0200 |
|---|---|---|
| committer | ojacobson <ojacobson@noreply.codeberg.org> | 2025-08-27 06:10:29 +0200 |
| commit | 8712c3a19c279d664ce75e8e90d6dde1bda56cb4 (patch) | |
| tree | 93c95548126eea048cd8962345b720b883d391c1 /src/broadcast.rs | |
| parent | 7b131e35fdea1a68aaf9230d157bafb200557ef8 (diff) | |
| parent | f839449d5505b5352bd0da931b980a7a0305234f (diff) | |
Implement storage of synchronized entities in terms of events, not state.
Conversations, users, messages, and all other "synchronized" entities now have an in-memory implementation of their lifecycle, rather than a database-backed one. These operations take a history, apply one lifecycle change to that history, and emit a new history. Storage is then implemented by applying the events in this new history to the database.
The storage methods in repo types, which process these events by emitting SQL statements, make necessary assumptions that the events being passed to them are coherent with the data already in storage. For example, the code to handle a conversation's delete event is allowed to assume that the database already contains a row for that conversation, inserted in response to a prior conversation creation event.
Data retrieval is not modified in this commit, and probably never will be without a more thorough storage rewrite. The whole intention of the data modelling approach I've been using is that a single row per entity represents its entire history, in turn so that the data in the database should be legible to people approaching it using normal SQL tools.
Developed as an aesthetic response to increasing unease with the lack of an ORM versus the boring-ness of our actual queries.
Merges event-based-storage into main.
Diffstat (limited to 'src/broadcast.rs')
| -rw-r--r-- | src/broadcast.rs | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/src/broadcast.rs b/src/broadcast.rs index 6e1f04d..dae7641 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -1,16 +1,11 @@ -use std::sync::{Arc, Mutex}; - use futures::{Stream, future, stream::StreamExt as _}; use tokio::sync::broadcast::{Sender, channel}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; -// Clones will share the same sender. +// Clones will share the same channel. #[derive(Clone)] pub struct Broadcaster<M> { - // The use of std::sync::Mutex, and not tokio::sync::Mutex, follows Tokio's - // own advice: <https://tokio.rs/tokio/tutorial/shared-state>. Methods that - // lock it must be sync. - senders: Arc<Mutex<Sender<M>>>, + sender: Sender<M>, } impl<M> Default for Broadcaster<M> @@ -20,9 +15,7 @@ where fn default() -> Self { let sender = Self::make_sender(); - Self { - senders: Arc::new(Mutex::new(sender)), - } + Self { sender } } } @@ -30,9 +23,7 @@ impl<M> Broadcaster<M> where M: Clone + Send + std::fmt::Debug + 'static, { - pub fn broadcast(&self, message: impl Into<M>) { - let tx = self.sender(); - + pub fn broadcast(&self, message: M) { // Per the Tokio docs, the returned error is only used to indicate that // there are no receivers. In this use case, that's fine; a lack of // listening consumers (chat clients) when a message is sent isn't an @@ -40,11 +31,29 @@ where // // The successful return value, which includes the number of active // receivers, also isn't that interesting to us. - let _ = tx.send(message.into()); + let _ = self.sender.send(message); + } + + // If `M` is a type that can be obtained from an iterator, such as a `Vec`, and if `I` is an + // iterable of items that can be collected into `M`, then this will construct an `M` from the + // passed event iterator, converting each element as it goes. This emits one message (as `M`), + // containing whatever we collect out of `messages`. + // + // This is mostly meant for handling synchronized entity events, which tend to be generated as + // iterables of domain-specific event types, like `user::Event`, but broadcast as `Vec<event::Event>` + // for consumption by outside clients. + pub fn broadcast_from<I, E>(&self, messages: I) + where + I: IntoIterator, + M: FromIterator<E>, + E: From<I::Item>, + { + let message = messages.into_iter().map(Into::into).collect(); + self.broadcast(message); } pub fn subscribe(&self) -> impl Stream<Item = M> + std::fmt::Debug + use<M> { - let rx = self.sender().subscribe(); + let rx = self.sender.subscribe(); BroadcastStream::from(rx).scan((), |(), r| { // The following could technically be `r.ok()`, and is exactly @@ -65,10 +74,6 @@ where }) } - fn sender(&self) -> Sender<M> { - self.senders.lock().unwrap().clone() - } - fn make_sender() -> Sender<M> { // Queue depth of 16 chosen entirely arbitrarily. Don't read too much // into it. |
