diff options
Diffstat (limited to 'src/message/app.rs')
| -rw-r--r-- | src/message/app.rs | 178 |
1 files changed, 119 insertions, 59 deletions
diff --git a/src/message/app.rs b/src/message/app.rs index b82fa83..8200650 100644 --- a/src/message/app.rs +++ b/src/message/app.rs @@ -1,6 +1,7 @@ use chrono::TimeDelta; use itertools::Itertools; use sqlx::sqlite::SqlitePool; +use web_push::WebPushError; use super::{Body, History, Id, Message, history, repo::Provider as _}; use crate::{ @@ -11,72 +12,24 @@ use crate::{ error::failed::{Failed, ResultExt as _}, event::{Broadcaster, Sequence, repo::Provider as _}, login::Login, + push::{Publish, repo::Provider as _}, user::{self, repo::Provider as _}, + vapid::repo::Provider as _, }; -pub struct Messages { +pub struct Messages<P> { db: SqlitePool, events: Broadcaster, + publisher: P, } -impl Messages { - pub const fn new(db: SqlitePool, events: Broadcaster) -> Self { - Self { db, events } - } - - pub async fn send( - &self, - conversation: &conversation::Id, - sender: &Login, - sent_at: &DateTime, - body: &Body, - ) -> Result<Message, SendError> { - let conversation_not_found = || SendError::ConversationNotFound(conversation.clone()); - let conversation_deleted = || SendError::ConversationDeleted(conversation.clone()); - let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into()); - let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into()); - - let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; - let conversation = tx - .conversations() - .by_id(conversation) - .await - .optional() - .fail("Failed to load conversation")? - .ok_or_else(conversation_not_found)?; - let sender = tx - .users() - .by_login(sender) - .await - .optional() - .fail("Failed to load sending user")? - .ok_or_else(sender_not_found)?; - - // Ordering: don't bother allocating a sequence number before we know the channel might - // exist. - let sent = tx - .sequence() - .next(sent_at) - .await - .fail("Failed to find event sequence number")?; - let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; - let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; - let message = History::begin(&conversation, &sender, body, sent); - - // This filter technically includes every event in the history, but it's easier to follow if - // the various event-manipulating app methods are consistent, and it's harmless to have an - // always-satisfied filter. - let events = message.events().filter(Sequence::start_from(sent)); - tx.messages() - .record_events(events.clone()) - .await - .fail("Failed to store events")?; - - tx.commit().await.fail(db::failed::COMMIT)?; - - self.events.broadcast_from(events); - - Ok(message.as_sent()) +impl<P> Messages<P> { + pub const fn new(db: SqlitePool, events: Broadcaster, publisher: P) -> Self { + Self { + db, + events, + publisher, + } } pub async fn delete( @@ -163,6 +116,113 @@ impl Messages { } } +impl<P> Messages<P> +where + P: Publish, +{ + pub async fn send( + &self, + conversation: &conversation::Id, + sender: &Login, + sent_at: &DateTime, + body: &Body, + ) -> Result<Message, SendError> { + let conversation_not_found = || SendError::ConversationNotFound(conversation.clone()); + let conversation_deleted = || SendError::ConversationDeleted(conversation.clone()); + let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into()); + let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into()); + + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + + let signer = tx + .vapid() + .signer() + .await + .fail("Failed to load VAPID signer")?; + let push_recipients = tx + .push() + .broadcast_from(sender) + .await + .fail("Failed to load push recipients")?; + + let conversation = tx + .conversations() + .by_id(conversation) + .await + .optional() + .fail("Failed to load conversation")? + .ok_or_else(conversation_not_found)?; + let sender = tx + .users() + .by_login(sender) + .await + .optional() + .fail("Failed to load sending user")? + .ok_or_else(sender_not_found)?; + + // Ordering: don't bother allocating a sequence number before we know the channel might + // exist. + let sent = tx + .sequence() + .next(sent_at) + .await + .fail("Failed to find event sequence number")?; + let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?; + let sender = sender.as_of(sent).ok_or_else(sender_deleted)?; + let message = History::begin(&conversation, &sender, body, sent); + + // This filter technically includes every event in the history, but it's easier to follow if + // the various event-manipulating app methods are consistent, and it's harmless to have an + // always-satisfied filter. + let events = message.events().filter(Sequence::start_from(sent)); + tx.messages() + .record_events(events.clone()) + .await + .fail("Failed to store events")?; + + tx.commit().await.fail(db::failed::COMMIT)?; + + self.events.broadcast_from(events.clone()); + for event in events { + let failures = self + .publisher + .publish(event, &signer, &push_recipients) + .await + .fail("Failed to publish push events")?; + + if !failures.is_empty() { + let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?; + // Note that data integrity guarantees from the original transaction to read + // subscriptions may no longer be valid now. Time has passed. Depending on how slow + // delivering push notifications is, potentially a _lot_ of time has passed. + + for (sub, err) in failures { + match err { + // I _think_ this is the complete set of permanent failures. See + // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete + // list. + WebPushError::Unauthorized(_) + | WebPushError::InvalidUri + | WebPushError::EndpointNotValid(_) + | WebPushError::EndpointNotFound(_) + | WebPushError::InvalidCryptoKeys + | WebPushError::MissingCryptoKeys => { + tx.push().unsubscribe(sub).await.fail( + "Failed to unsubscribe after permanent push message rejection", + )?; + } + _ => (), + } + } + + tx.commit().await.fail(db::failed::COMMIT)?; + } + } + + Ok(message.as_sent()) + } +} + #[derive(Debug, thiserror::Error)] pub enum SendError { #[error("conversation {0} not found")] |
