diff options
| author | Owen Jacobson <owen@grimoire.ca> | 2025-11-25 21:02:25 -0500 |
|---|---|---|
| committer | Owen Jacobson <owen@grimoire.ca> | 2025-11-25 21:02:25 -0500 |
| commit | 664e3beba053aee50fc6b3cdcc6ee0dfe5e0fe1f (patch) | |
| tree | 096b997d56959dd88d099f4f96a383daa4dbc39a /src/event | |
| parent | 91c33501a315abe04aeed54aa27388ce0ad241ce (diff) | |
| parent | 33601ef703a640b57e5bd0bf7dbd6d7ffa7377bf (diff) | |
Merge branch 'house-of-failed'
Diffstat (limited to 'src/event')
| -rw-r--r-- | src/event/app.rs | 75 | ||||
| -rw-r--r-- | src/event/handlers/stream/mod.rs | 10 |
2 files changed, 31 insertions, 54 deletions
diff --git a/src/event/app.rs b/src/event/app.rs index e422de9..386d749 100644 --- a/src/event/app.rs +++ b/src/event/app.rs @@ -8,9 +8,9 @@ use sqlx::sqlite::SqlitePool; use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster}; use crate::{ conversation::{self, repo::Provider as _}, - db::NotFound, + db::{self, NotFound as _}, + error::failed::{Failed, ResultExt as _}, message::{self, repo::Provider as _}, - name, user::{self, repo::Provider as _}, vapid, vapid::repo::Provider as _, @@ -29,14 +29,18 @@ impl Events { pub async fn subscribe( &self, resume_at: Sequence, - ) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Error> { + ) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Failed> { // Subscribe before retrieving, to catch messages broadcast while we're // querying the DB. We'll prune out duplicates later. let live_messages = self.events.subscribe(); - let mut tx = self.db.begin().await?; + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - let users = tx.users().replay(resume_at).await?; + let users = tx + .users() + .replay(resume_at) + .await + .fail("Failed to load user events")?; let user_events = users .iter() .map(user::History::events) @@ -44,7 +48,11 @@ impl Events { .filter(Sequence::after(resume_at)) .map(Event::from); - let conversations = tx.conversations().replay(resume_at).await?; + let conversations = tx + .conversations() + .replay(resume_at) + .await + .fail("Failed to load conversation events")?; let conversation_events = conversations .iter() .map(conversation::History::events) @@ -52,7 +60,11 @@ impl Events { .filter(Sequence::after(resume_at)) .map(Event::from); - let messages = tx.messages().replay(resume_at).await?; + let messages = tx + .messages() + .replay(resume_at) + .await + .fail("Failed to load message events")?; let message_events = messages .iter() .map(message::History::events) @@ -60,7 +72,12 @@ impl Events { .filter(Sequence::after(resume_at)) .map(Event::from); - let vapid = tx.vapid().current().await.optional()?; + let vapid = tx + .vapid() + .current() + .await + .optional() + .fail("Failed to load VAPID key events")?; let vapid_events = vapid .iter() .flat_map(vapid::History::events) @@ -91,45 +108,3 @@ impl Events { move |event| future::ready(filter(event)) } } - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub enum Error { - Database(#[from] sqlx::Error), - Name(#[from] name::Error), - Ecdsa(#[from] p256::ecdsa::Error), - Pkcs8(#[from] p256::pkcs8::Error), - WebPush(#[from] web_push::WebPushError), -} - -impl From<user::repo::LoadError> for Error { - fn from(error: user::repo::LoadError) -> Self { - use user::repo::LoadError; - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -impl From<conversation::repo::LoadError> for Error { - fn from(error: conversation::repo::LoadError) -> Self { - use conversation::repo::LoadError; - match error { - LoadError::Database(error) => error.into(), - LoadError::Name(error) => error.into(), - } - } -} - -impl From<vapid::repo::Error> for Error { - fn from(error: vapid::repo::Error) -> Self { - use vapid::repo::Error; - match error { - Error::Database(error) => error.into(), - Error::Ecdsa(error) => error.into(), - Error::Pkcs8(error) => error.into(), - Error::WebPush(error) => error.into(), - } - } -} diff --git a/src/event/handlers/stream/mod.rs b/src/event/handlers/stream/mod.rs index 8b89c31..dde4fae 100644 --- a/src/event/handlers/stream/mod.rs +++ b/src/event/handlers/stream/mod.rs @@ -10,8 +10,8 @@ use futures::stream::{Stream, StreamExt as _}; use crate::{ app::App, - error::{Internal, Unauthorized}, - event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, app, extract::LastEventId}, + error::{Internal, Unauthorized, failed::Failed}, + event::{Event, Heartbeat::Heartbeat, Sequence, Sequenced as _, extract::LastEventId}, token::{app::ValidateError, extract::Identity}, }; @@ -71,7 +71,7 @@ impl TryFrom<Event> for sse::Event { #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum Error { - Subscribe(#[from] app::Error), + Subscribe(#[from] Failed), Validate(#[from] ValidateError), } @@ -79,7 +79,9 @@ impl IntoResponse for Error { fn into_response(self) -> response::Response { match self { Self::Validate(ValidateError::InvalidToken) => Unauthorized.into_response(), - other => Internal::from(other).into_response(), + Self::Validate(ValidateError::Failed(_)) | Self::Subscribe(_) => { + Internal::from(self).into_response() + } } } } |
