summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorKit La Touche <kit@transneptune.net>2024-10-03 23:30:42 -0400
committerKit La Touche <kit@transneptune.net>2024-10-03 23:30:42 -0400
commitd50b1b56c011c03c7d8a95242af404b727e91a80 (patch)
treeefe3408f6a8ef669981826d1a29d16a24b460d89 /src/channel
parent30c13478d61065a512f5bc8824fecbf2ee6afc81 (diff)
parent7f12fd41c2941a55a6437f24e4f780104a718790 (diff)
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs129
-rw-r--r--src/channel/event.rs48
-rw-r--r--src/channel/history.rs42
-rw-r--r--src/channel/id.rs38
-rw-r--r--src/channel/mod.rs7
-rw-r--r--src/channel/repo.rs202
-rw-r--r--src/channel/routes.rs122
-rw-r--r--src/channel/routes/test/list.rs7
-rw-r--r--src/channel/routes/test/on_create.rs13
-rw-r--r--src/channel/routes/test/on_send.rs23
-rw-r--r--src/channel/snapshot.rs38
11 files changed, 613 insertions, 56 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 70cda47..bb331ec 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,10 +1,13 @@
use chrono::TimeDelta;
+use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use super::{repo::Provider as _, Channel, Id};
use crate::{
clock::DateTime,
- events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
- repo::channel::{Channel, Provider as _},
+ db::NotFound,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
+ message::{repo::Provider as _, Message},
};
pub struct Channels<'a> {
@@ -19,27 +22,108 @@ impl<'a> Channels<'a> {
pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
+ let created = tx.sequence().next(created_at).await?;
let channel = tx
.channels()
- .create(name, created_at)
+ .create(name, &created)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
self.events
- .broadcast(&ChannelEvent::created(channel.clone()));
+ .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
- Ok(channel)
+ Ok(channel.snapshot())
}
- pub async fn all(&self) -> Result<Vec<Channel>, InternalError> {
+ pub async fn all(&self, resume_point: Option<Sequence>) -> Result<Vec<Channel>, InternalError> {
let mut tx = self.db.begin().await?;
- let channels = tx.channels().all().await?;
+ let channels = tx.channels().all(resume_point).await?;
tx.commit().await?;
+ let channels = channels
+ .into_iter()
+ .filter_map(|channel| {
+ channel
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
Ok(channels)
}
+ pub async fn messages(
+ &self,
+ channel: &Id,
+ resume_point: Option<Sequence>,
+ ) -> Result<Vec<Message>, Error> {
+ let mut tx = self.db.begin().await?;
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let messages = tx
+ .messages()
+ .in_channel(&channel, resume_point)
+ .await?
+ .into_iter()
+ .filter_map(|message| {
+ message
+ .events()
+ .filter(Sequence::up_to(resume_point))
+ .collect()
+ })
+ .collect();
+
+ Ok(messages)
+ }
+
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
+ let mut tx = self.db.begin().await?;
+
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| Error::NotFound(channel.clone()))?
+ .snapshot();
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().in_channel(&channel, None).await?;
+ for message in messages {
+ let message = message.snapshot();
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(&message.id, &deleted).await?;
+ events.extend(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let channel = tx.channels().delete(&channel.id, &deleted).await?;
+ events.extend(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -49,19 +133,24 @@ impl<'a> Channels<'a> {
let mut events = Vec::with_capacity(expired.len());
for channel in expired {
- let sequence = tx.message_events().assign_sequence(&channel).await?;
- let event = tx
- .channels()
- .delete_expired(&channel, sequence, relative_to)
- .await?;
- events.push(event);
+ let deleted = tx.sequence().next(relative_to).await?;
+ let channel = tx.channels().delete(&channel, &deleted).await?;
+ events.push(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence)),
+ );
}
tx.commit().await?;
- for event in events {
- self.events.broadcast(&event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(Sequence::merge)
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
@@ -75,6 +164,14 @@ pub enum CreateError {
DatabaseError(#[from] sqlx::Error),
}
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {
diff --git a/src/channel/event.rs b/src/channel/event.rs
new file mode 100644
index 0000000..9c54174
--- /dev/null
+++ b/src/channel/event.rs
@@ -0,0 +1,48 @@
+use super::Channel;
+use crate::{
+ channel,
+ event::{Instant, Sequenced},
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Event {
+ #[serde(flatten)]
+ pub instant: Instant,
+ #[serde(flatten)]
+ pub kind: Kind,
+}
+
+impl Sequenced for Event {
+ fn instant(&self) -> Instant {
+ self.instant
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum Kind {
+ Created(Created),
+ Deleted(Deleted),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Created {
+ pub channel: Channel,
+}
+
+impl From<Created> for Kind {
+ fn from(event: Created) -> Self {
+ Self::Created(event)
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Deleted {
+ pub channel: channel::Id,
+}
+
+impl From<Deleted> for Kind {
+ fn from(event: Deleted) -> Self {
+ Self::Deleted(event)
+ }
+}
diff --git a/src/channel/history.rs b/src/channel/history.rs
new file mode 100644
index 0000000..3cc7d9d
--- /dev/null
+++ b/src/channel/history.rs
@@ -0,0 +1,42 @@
+use super::{
+ event::{Created, Deleted, Event},
+ Channel,
+};
+use crate::event::Instant;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct History {
+ pub channel: Channel,
+ pub created: Instant,
+ pub deleted: Option<Instant>,
+}
+
+impl History {
+ fn created(&self) -> Event {
+ Event {
+ instant: self.created,
+ kind: Created {
+ channel: self.channel.clone(),
+ }
+ .into(),
+ }
+ }
+
+ fn deleted(&self) -> Option<Event> {
+ self.deleted.map(|instant| Event {
+ instant,
+ kind: Deleted {
+ channel: self.channel.id.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub fn events(&self) -> impl Iterator<Item = Event> {
+ [self.created()].into_iter().chain(self.deleted())
+ }
+
+ pub fn snapshot(&self) -> Channel {
+ self.channel.clone()
+ }
+}
diff --git a/src/channel/id.rs b/src/channel/id.rs
new file mode 100644
index 0000000..22a2700
--- /dev/null
+++ b/src/channel/id.rs
@@ -0,0 +1,38 @@
+use std::fmt;
+
+use crate::id::Id as BaseId;
+
+// Stable identifier for a [Channel]. Prefixed with `C`.
+#[derive(
+ Clone,
+ Debug,
+ Eq,
+ Hash,
+ Ord,
+ PartialEq,
+ PartialOrd,
+ sqlx::Type,
+ serde::Deserialize,
+ serde::Serialize,
+)]
+#[sqlx(transparent)]
+#[serde(transparent)]
+pub struct Id(BaseId);
+
+impl From<BaseId> for Id {
+ fn from(id: BaseId) -> Self {
+ Self(id)
+ }
+}
+
+impl Id {
+ pub fn generate() -> Self {
+ BaseId::generate("C")
+ }
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
diff --git a/src/channel/mod.rs b/src/channel/mod.rs
index 9f79dbb..eb8200b 100644
--- a/src/channel/mod.rs
+++ b/src/channel/mod.rs
@@ -1,4 +1,9 @@
pub mod app;
+pub mod event;
+mod history;
+mod id;
+pub mod repo;
mod routes;
+mod snapshot;
-pub use self::routes::router;
+pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Channel};
diff --git a/src/channel/repo.rs b/src/channel/repo.rs
new file mode 100644
index 0000000..2b48436
--- /dev/null
+++ b/src/channel/repo.rs
@@ -0,0 +1,202 @@
+use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+
+use crate::{
+ channel::{Channel, History, Id},
+ clock::DateTime,
+ event::{Instant, Sequence},
+};
+
+pub trait Provider {
+ fn channels(&mut self) -> Channels;
+}
+
+impl<'c> Provider for Transaction<'c, Sqlite> {
+ fn channels(&mut self) -> Channels {
+ Channels(self)
+ }
+}
+
+pub struct Channels<'t>(&'t mut SqliteConnection);
+
+impl<'c> Channels<'c> {
+ pub async fn create(&mut self, name: &str, created: &Instant) -> Result<History, sqlx::Error> {
+ let id = Id::generate();
+ let channel = sqlx::query!(
+ r#"
+ insert
+ into channel (id, name, created_at, created_sequence)
+ values ($1, $2, $3, $4)
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ "#,
+ id,
+ name,
+ created.at,
+ created.sequence,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn by_id(&mut self, channel: &Id) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where id = $1
+ "#,
+ channel,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn all(&mut self, resume_at: Option<Sequence>) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence <= $1, true)
+ order by channel.name
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn replay(
+ &mut self,
+ resume_at: Option<Sequence>,
+ ) -> Result<Vec<History>, sqlx::Error> {
+ let channels = sqlx::query!(
+ r#"
+ select
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ from channel
+ where coalesce(created_sequence > $1, true)
+ "#,
+ resume_at,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: None,
+ })
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+
+ pub async fn delete(
+ &mut self,
+ channel: &Id,
+ deleted: &Instant,
+ ) -> Result<History, sqlx::Error> {
+ let channel = sqlx::query!(
+ r#"
+ delete from channel
+ where id = $1
+ returning
+ id as "id: Id",
+ name,
+ created_at as "created_at: DateTime",
+ created_sequence as "created_sequence: Sequence"
+ "#,
+ channel,
+ )
+ .map(|row| History {
+ channel: Channel {
+ id: row.id,
+ name: row.name,
+ },
+ created: Instant {
+ at: row.created_at,
+ sequence: row.created_sequence,
+ },
+ deleted: Some(*deleted),
+ })
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(channel)
+ }
+
+ pub async fn expired(&mut self, expired_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ let channels = sqlx::query_scalar!(
+ r#"
+ select
+ channel.id as "id: Id"
+ from channel
+ left join message
+ where created_at < $1
+ and message.id is null
+ "#,
+ expired_at,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(channels)
+ }
+}
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 1f8db5a..23c0602 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -2,20 +2,19 @@ use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::{IntoResponse, Response},
- routing::{get, post},
+ routing::{delete, get, post},
Router,
};
+use axum_extra::extract::Query;
-use super::app;
+use super::{app, Channel, Id};
use crate::{
app::App,
clock::RequestedAt,
error::Internal,
- events::app::EventsError,
- repo::{
- channel::{self, Channel},
- login::Login,
- },
+ event::{Instant, Sequence},
+ login::Login,
+ message::{self, app::SendError},
};
#[cfg(test)]
@@ -26,10 +25,21 @@ pub fn router() -> Router<App> {
.route("/api/channels", get(list))
.route("/api/channels", post(on_create))
.route("/api/channels/:channel", post(on_send))
+ .route("/api/channels/:channel", delete(on_delete))
+ .route("/api/channels/:channel/messages", get(messages))
}
-async fn list(State(app): State<App>, _: Login) -> Result<Channels, Internal> {
- let channels = app.channels().all().await?;
+#[derive(Default, serde::Deserialize)]
+struct ResumeQuery {
+ resume_point: Option<Sequence>,
+}
+
+async fn list(
+ State(app): State<App>,
+ _: Login,
+ Query(query): Query<ResumeQuery>,
+) -> Result<Channels, Internal> {
+ let channels = app.channels().all(query.resume_point).await?;
let response = Channels(channels);
Ok(response)
@@ -86,31 +96,107 @@ struct SendRequest {
async fn on_send(
State(app): State<App>,
- Path(channel): Path<channel::Id>,
+ Path(channel): Path<Id>,
RequestedAt(sent_at): RequestedAt,
login: Login,
Json(request): Json<SendRequest>,
+) -> Result<StatusCode, SendErrorResponse> {
+ app.messages()
+ .send(&channel, &login, &sent_at, &request.message)
+ .await?;
+
+ Ok(StatusCode::ACCEPTED)
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+struct SendErrorResponse(#[from] SendError);
+
+impl IntoResponse for SendErrorResponse {
+ fn into_response(self) -> Response {
+ let Self(error) = self;
+ match error {
+ not_found @ SendError::ChannelNotFound(_) => {
+ (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
+ }
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
+
+async fn on_delete(
+ State(app): State<App>,
+ Path(channel): Path<Id>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Login,
) -> Result<StatusCode, ErrorResponse> {
- app.events()
- .send(&login, &channel, &request.message, &sent_at)
- .await
- // Could impl `From` here, but it's more code and this is used once.
- .map_err(ErrorResponse)?;
+ app.channels().delete(&channel, &deleted_at).await?;
Ok(StatusCode::ACCEPTED)
}
-#[derive(Debug)]
-struct ErrorResponse(EventsError);
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+struct ErrorResponse(#[from] app::Error);
impl IntoResponse for ErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ EventsError::ChannelNotFound(_) => {
+ not_found @ app::Error::NotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
}
}
}
+
+async fn messages(
+ State(app): State<App>,
+ Path(channel): Path<Id>,
+ _: Login,
+ Query(query): Query<ResumeQuery>,
+) -> Result<Messages, ErrorResponse> {
+ let messages = app
+ .channels()
+ .messages(&channel, query.resume_point)
+ .await?;
+ let response = Messages(
+ messages
+ .into_iter()
+ .map(|message| MessageView {
+ sent: message.sent,
+ sender: message.sender,
+ message: MessageInner {
+ id: message.id,
+ body: message.body,
+ },
+ })
+ .collect(),
+ );
+
+ Ok(response)
+}
+
+struct Messages(Vec<MessageView>);
+
+#[derive(serde::Serialize)]
+struct MessageView {
+ #[serde(flatten)]
+ sent: Instant,
+ sender: Login,
+ message: MessageInner,
+}
+
+#[derive(serde::Serialize)]
+struct MessageInner {
+ id: message::Id,
+ body: String,
+}
+
+impl IntoResponse for Messages {
+ fn into_response(self) -> Response {
+ let Self(messages) = self;
+ Json(messages).into_response()
+ }
+}
diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs
index bc94024..f15a53c 100644
--- a/src/channel/routes/test/list.rs
+++ b/src/channel/routes/test/list.rs
@@ -1,4 +1,5 @@
use axum::extract::State;
+use axum_extra::extract::Query;
use crate::{channel::routes, test::fixtures};
@@ -11,7 +12,7 @@ async fn empty_list() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -30,7 +31,7 @@ async fn one_channel() {
// Call the endpoint
- let routes::Channels(channels) = routes::list(State(app), viewer)
+ let routes::Channels(channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
@@ -52,7 +53,7 @@ async fn multiple_channels() {
// Call the endpoint
- let routes::Channels(response_channels) = routes::list(State(app), viewer)
+ let routes::Channels(response_channels) = routes::list(State(app), viewer, Query::default())
.await
.expect("always succeeds");
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index e2610a5..5733c9e 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -3,7 +3,7 @@ use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
- events::types,
+ event,
test::fixtures::{self, future::Immediately as _},
};
@@ -33,26 +33,25 @@ async fn new_channel() {
// Verify the semantics
- let channels = app.channels().all().await.expect("always succeeds");
+ let channels = app.channels().all(None).await.expect("always succeeds");
assert!(channels.contains(&response_channel));
let mut events = app
.events()
- .subscribe(types::ResumePoint::default())
+ .subscribe(None)
.await
.expect("subscribing never fails")
.filter(fixtures::filter::created());
- let types::ResumableEvent(_, event) = events
+ let event = events
.next()
.immediately()
.await
.expect("creation event published");
- assert_eq!(types::Sequence::default(), event.sequence);
assert!(matches!(
- event.data,
- types::ChannelEventData::Created(event)
+ event.kind,
+ event::Kind::ChannelCreated(event)
if event.channel == response_channel
));
}
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 233518b..3297093 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -2,9 +2,10 @@ use axum::extract::{Json, Path, State};
use futures::stream::StreamExt;
use crate::{
+ channel,
channel::routes,
- events::{app, types},
- repo::channel,
+ event,
+ message::app::SendError,
test::fixtures::{self, future::Immediately as _},
};
@@ -43,7 +44,7 @@ async fn messages_in_order() {
let events = app
.events()
- .subscribe(types::ResumePoint::default())
+ .subscribe(None)
.await
.expect("subscribing to a valid channel")
.filter(fixtures::filter::messages())
@@ -51,13 +52,13 @@ async fn messages_in_order() {
let events = events.collect::<Vec<_>>().immediately().await;
- for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) {
- assert_eq!(*sent_at, event.at);
+ for ((sent_at, message), event) in requests.into_iter().zip(events) {
+ assert_eq!(*sent_at, event.instant.at);
assert!(matches!(
- event.data,
- types::ChannelEventData::Message(event_message)
- if event_message.sender == sender
- && event_message.message.body == message
+ event.kind,
+ event::Kind::MessageSent(event)
+ if event.message.sender == sender
+ && event.message.body == message
));
}
}
@@ -76,7 +77,7 @@ async fn nonexistent_channel() {
let request = routes::SendRequest {
message: fixtures::message::propose(),
};
- let routes::ErrorResponse(error) = routes::on_send(
+ let routes::SendErrorResponse(error) = routes::on_send(
State(app),
Path(channel.clone()),
sent_at,
@@ -90,6 +91,6 @@ async fn nonexistent_channel() {
assert!(matches!(
error,
- app::EventsError::ChannelNotFound(error_channel) if channel == error_channel
+ SendError::ChannelNotFound(error_channel) if channel == error_channel
));
}
diff --git a/src/channel/snapshot.rs b/src/channel/snapshot.rs
new file mode 100644
index 0000000..6462f25
--- /dev/null
+++ b/src/channel/snapshot.rs
@@ -0,0 +1,38 @@
+use super::{
+ event::{Created, Event, Kind},
+ Id,
+};
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct Channel {
+ pub id: Id,
+ pub name: String,
+}
+
+impl Channel {
+ fn apply(state: Option<Self>, event: Event) -> Option<Self> {
+ match (state, event.kind) {
+ (None, Kind::Created(event)) => Some(event.into()),
+ (Some(channel), Kind::Deleted(event)) if channel.id == event.channel => None,
+ (state, event) => panic!("invalid channel event {event:#?} for state {state:#?}"),
+ }
+ }
+}
+
+impl FromIterator<Event> for Option<Channel> {
+ fn from_iter<I: IntoIterator<Item = Event>>(events: I) -> Self {
+ events.into_iter().fold(None, Channel::apply)
+ }
+}
+
+impl From<&Created> for Channel {
+ fn from(event: &Created) -> Self {
+ event.channel.clone()
+ }
+}
+
+impl From<Created> for Channel {
+ fn from(event: Created) -> Self {
+ event.channel
+ }
+}