summaryrefslogtreecommitdiff
path: root/src/event/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/app.rs')
-rw-r--r--src/event/app.rs75
1 files changed, 25 insertions, 50 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index e422de9..386d749 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -8,9 +8,9 @@ use sqlx::sqlite::SqlitePool;
use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster};
use crate::{
conversation::{self, repo::Provider as _},
- db::NotFound,
+ db::{self, NotFound as _},
+ error::failed::{Failed, ResultExt as _},
message::{self, repo::Provider as _},
- name,
user::{self, repo::Provider as _},
vapid,
vapid::repo::Provider as _,
@@ -29,14 +29,18 @@ impl Events {
pub async fn subscribe(
&self,
resume_at: Sequence,
- ) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Error> {
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Failed> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
- let mut tx = self.db.begin().await?;
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- let users = tx.users().replay(resume_at).await?;
+ let users = tx
+ .users()
+ .replay(resume_at)
+ .await
+ .fail("Failed to load user events")?;
let user_events = users
.iter()
.map(user::History::events)
@@ -44,7 +48,11 @@ impl Events {
.filter(Sequence::after(resume_at))
.map(Event::from);
- let conversations = tx.conversations().replay(resume_at).await?;
+ let conversations = tx
+ .conversations()
+ .replay(resume_at)
+ .await
+ .fail("Failed to load conversation events")?;
let conversation_events = conversations
.iter()
.map(conversation::History::events)
@@ -52,7 +60,11 @@ impl Events {
.filter(Sequence::after(resume_at))
.map(Event::from);
- let messages = tx.messages().replay(resume_at).await?;
+ let messages = tx
+ .messages()
+ .replay(resume_at)
+ .await
+ .fail("Failed to load message events")?;
let message_events = messages
.iter()
.map(message::History::events)
@@ -60,7 +72,12 @@ impl Events {
.filter(Sequence::after(resume_at))
.map(Event::from);
- let vapid = tx.vapid().current().await.optional()?;
+ let vapid = tx
+ .vapid()
+ .current()
+ .await
+ .optional()
+ .fail("Failed to load VAPID key events")?;
let vapid_events = vapid
.iter()
.flat_map(vapid::History::events)
@@ -91,45 +108,3 @@ impl Events {
move |event| future::ready(filter(event))
}
}
-
-#[derive(Debug, thiserror::Error)]
-#[error(transparent)]
-pub enum Error {
- Database(#[from] sqlx::Error),
- Name(#[from] name::Error),
- Ecdsa(#[from] p256::ecdsa::Error),
- Pkcs8(#[from] p256::pkcs8::Error),
- WebPush(#[from] web_push::WebPushError),
-}
-
-impl From<user::repo::LoadError> for Error {
- fn from(error: user::repo::LoadError) -> Self {
- use user::repo::LoadError;
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-impl From<conversation::repo::LoadError> for Error {
- fn from(error: conversation::repo::LoadError) -> Self {
- use conversation::repo::LoadError;
- match error {
- LoadError::Database(error) => error.into(),
- LoadError::Name(error) => error.into(),
- }
- }
-}
-
-impl From<vapid::repo::Error> for Error {
- fn from(error: vapid::repo::Error) -> Self {
- use vapid::repo::Error;
- match error {
- Error::Database(error) => error.into(),
- Error::Ecdsa(error) => error.into(),
- Error::Pkcs8(error) => error.into(),
- Error::WebPush(error) => error.into(),
- }
- }
-}