summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-18 02:22:09 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-18 12:18:45 -0400
commit2b4cf5c62ff82fa408a4f82bde0b561ff3b15497 (patch)
tree66d1d695a54eef27c4509fb2f5e7e8372a96ba1e /src
parentcce6662d635bb2115f9f2a7bab92cc105166e761 (diff)
Make BoxedError an implementation detail of InternalError.
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs44
-rw-r--r--src/cli.rs14
-rw-r--r--src/error.rs5
-rw-r--r--src/events.rs17
4 files changed, 51 insertions, 29 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index e72564d..0a28fb6 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -3,17 +3,16 @@ use std::sync::{Arc, Mutex, MutexGuard};
use futures::{
future,
- stream::{self, StreamExt as _, TryStreamExt as _},
+ stream::{self, StreamExt as _},
Stream,
};
use sqlx::sqlite::SqlitePool;
use tokio::sync::broadcast::{channel, Sender};
-use tokio_stream::wrappers::BroadcastStream;
+use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use super::repo::broadcast::{self, Provider as _};
use crate::{
clock::DateTime,
- error::BoxedError,
repo::{
channel::{self, Channel, Provider as _},
error::NotFound as _,
@@ -75,17 +74,16 @@ impl<'a> Channels<'a> {
&self,
channel: &channel::Id,
resume_at: Option<&DateTime>,
- ) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, EventsError>
- {
- fn skip_stale<E>(
+ ) -> Result<impl Stream<Item = broadcast::Message> + 'static, EventsError> {
+ fn skip_stale(
resume_at: Option<&DateTime>,
- ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> {
+ ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
let resume_at = resume_at.cloned();
move |msg| {
- future::ready(Ok(match resume_at {
+ future::ready(match resume_at {
None => false,
Some(resume_at) => msg.sent_at <= resume_at,
- }))
+ })
}
}
let mut tx = self
@@ -98,13 +96,12 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
.listen(&channel.id)
- .map_err(BoxedError::from)
- .try_skip_while(skip_stale(resume_at));
+ .skip_while(skip_stale(resume_at));
let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
- let stored_messages = stream::iter(stored_messages).map(Ok);
+ let stored_messages = stream::iter(stored_messages);
Ok(stored_messages.chain(live_messages))
}
@@ -176,15 +173,36 @@ impl Broadcaster {
// there are no receivers. In this use case, that's fine; a lack of
// listening consumers (chat clients) when a message is sent isn't an
// error.
+ //
+ // The successful return value, which includes the number of active
+ // receivers, also isn't that interesting to us.
let _ = tx.send(message);
}
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &channel::Id) -> BroadcastStream<broadcast::Message> {
+ pub fn listen(&self, channel: &channel::Id) -> impl Stream<Item = broadcast::Message> {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
+ .take_while(|r| {
+ future::ready(match r {
+ Ok(_) => true,
+ // Stop the stream here. This will disconnect SSE clients
+ // (see `routes.rs`), who will then resume from
+ // `Last-Event-ID`, allowing them to catch up by reading
+ // the skipped messages from the database.
+ Err(BroadcastStreamRecvError::Lagged(_)) => false,
+ })
+ })
+ .map(|r| {
+ // Since the previous transform stops at the first error, this
+ // should always hold.
+ //
+ // See also <https://users.rust-lang.org/t/taking-from-stream-while-ok/48854>.
+ debug_assert!(r.is_ok());
+ r.unwrap()
+ })
}
// panic: if ``channel`` has not been previously registered, and was not
diff --git a/src/cli.rs b/src/cli.rs
index 641f99f..308e91f 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -6,9 +6,9 @@ use clap::Parser;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
use tokio::net;
-use crate::{app::App, channel, clock, error::BoxedError, events, index, login};
+use crate::{app::App, channel, clock, events, index, login};
-pub type Result<T> = std::result::Result<T, BoxedError>;
+pub type Result<T> = std::result::Result<T, Error>;
#[derive(Parser)]
pub struct Args {
@@ -73,3 +73,13 @@ fn started_msg(listener: &net::TcpListener) -> io::Result<String> {
let local_addr = listener.local_addr()?;
Ok(format!("listening on http://{local_addr}/"))
}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("io error: {0}")]
+ IoError(#[from] io::Error),
+ #[error("database error: {0}")]
+ DatabaseError(#[from] sqlx::Error),
+ #[error("database migration error: {0}")]
+ MigrateError(#[from] sqlx::migrate::MigrateError),
+}
diff --git a/src/error.rs b/src/error.rs
index b700eaa..21caeda 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -9,10 +9,7 @@ use axum::{
// complex (though very usable). We don't need to be overly careful about
// allocations on errors in this app, so this is fine for most "general
// failure" cases.
-//
-// If that changes, my hope is to use `thiserror` or something with a similar
-// strategy, before resorting to `anyhow`.
-pub type BoxedError = Box<dyn error::Error + Send + Sync>;
+type BoxedError = Box<dyn error::Error + Send + Sync>;
// Returns a 500 Internal Server Error to the client. Meant to be used via the
// `?` operator; _does not_ return the originating error to the client.
diff --git a/src/events.rs b/src/events.rs
index 9b5901e..5d2dcf0 100644
--- a/src/events.rs
+++ b/src/events.rs
@@ -9,15 +9,12 @@ use axum::{
};
use axum_extra::extract::Query;
use chrono::{format::SecondsFormat, DateTime};
-use futures::{
- future,
- stream::{self, StreamExt as _, TryStreamExt as _},
-};
+use futures::stream::{self, StreamExt as _, TryStreamExt as _};
use crate::{
app::App,
- channel::repo::broadcast,
- error::{BoxedError, InternalError},
+ channel::{app::EventsError, repo::broadcast},
+ error::InternalError,
header::LastEventId,
repo::{channel, login::Login},
};
@@ -52,21 +49,21 @@ async fn on_events(
.channels()
.events(&channel, resume_at.as_ref())
.await?
- .map_ok(ChannelEvent::wrap(channel));
+ .map(ChannelEvent::wrap(channel));
- Ok::<_, BoxedError>(events)
+ Ok::<_, EventsError>(events)
}
})
.try_collect::<Vec<_>>()
.await?;
- let stream = stream::select_all(streams).and_then(|msg| future::ready(to_sse_event(msg)));
+ let stream = stream::select_all(streams).map(to_sse_event);
let sse = Sse::new(stream).keep_alive(sse::KeepAlive::default());
Ok(sse)
}
-fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, BoxedError> {
+fn to_sse_event(event: ChannelEvent<broadcast::Message>) -> Result<sse::Event, serde_json::Error> {
let data = serde_json::to_string(&event)?;
let event = sse::Event::default()
.id(event