summaryrefslogtreecommitdiff
path: root/src/message/app.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/message/app.rs')
-rw-r--r--src/message/app.rs178
1 files changed, 119 insertions, 59 deletions
diff --git a/src/message/app.rs b/src/message/app.rs
index b82fa83..8200650 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -1,6 +1,7 @@
use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use web_push::WebPushError;
use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
@@ -11,72 +12,24 @@ use crate::{
error::failed::{Failed, ResultExt as _},
event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
+ push::{Publish, repo::Provider as _},
user::{self, repo::Provider as _},
+ vapid::repo::Provider as _,
};
-pub struct Messages {
+pub struct Messages<P> {
db: SqlitePool,
events: Broadcaster,
+ publisher: P,
}
-impl Messages {
- pub const fn new(db: SqlitePool, events: Broadcaster) -> Self {
- Self { db, events }
- }
-
- pub async fn send(
- &self,
- conversation: &conversation::Id,
- sender: &Login,
- sent_at: &DateTime,
- body: &Body,
- ) -> Result<Message, SendError> {
- let conversation_not_found = || SendError::ConversationNotFound(conversation.clone());
- let conversation_deleted = || SendError::ConversationDeleted(conversation.clone());
- let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into());
- let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into());
-
- let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
- let conversation = tx
- .conversations()
- .by_id(conversation)
- .await
- .optional()
- .fail("Failed to load conversation")?
- .ok_or_else(conversation_not_found)?;
- let sender = tx
- .users()
- .by_login(sender)
- .await
- .optional()
- .fail("Failed to load sending user")?
- .ok_or_else(sender_not_found)?;
-
- // Ordering: don't bother allocating a sequence number before we know the channel might
- // exist.
- let sent = tx
- .sequence()
- .next(sent_at)
- .await
- .fail("Failed to find event sequence number")?;
- let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
- let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
- let message = History::begin(&conversation, &sender, body, sent);
-
- // This filter technically includes every event in the history, but it's easier to follow if
- // the various event-manipulating app methods are consistent, and it's harmless to have an
- // always-satisfied filter.
- let events = message.events().filter(Sequence::start_from(sent));
- tx.messages()
- .record_events(events.clone())
- .await
- .fail("Failed to store events")?;
-
- tx.commit().await.fail(db::failed::COMMIT)?;
-
- self.events.broadcast_from(events);
-
- Ok(message.as_sent())
+impl<P> Messages<P> {
+ pub const fn new(db: SqlitePool, events: Broadcaster, publisher: P) -> Self {
+ Self {
+ db,
+ events,
+ publisher,
+ }
}
pub async fn delete(
@@ -163,6 +116,113 @@ impl Messages {
}
}
+impl<P> Messages<P>
+where
+ P: Publish,
+{
+ pub async fn send(
+ &self,
+ conversation: &conversation::Id,
+ sender: &Login,
+ sent_at: &DateTime,
+ body: &Body,
+ ) -> Result<Message, SendError> {
+ let conversation_not_found = || SendError::ConversationNotFound(conversation.clone());
+ let conversation_deleted = || SendError::ConversationDeleted(conversation.clone());
+ let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into());
+ let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into());
+
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+
+ let signer = tx
+ .vapid()
+ .signer()
+ .await
+ .fail("Failed to load VAPID signer")?;
+ let push_recipients = tx
+ .push()
+ .broadcast_from(sender)
+ .await
+ .fail("Failed to load push recipients")?;
+
+ let conversation = tx
+ .conversations()
+ .by_id(conversation)
+ .await
+ .optional()
+ .fail("Failed to load conversation")?
+ .ok_or_else(conversation_not_found)?;
+ let sender = tx
+ .users()
+ .by_login(sender)
+ .await
+ .optional()
+ .fail("Failed to load sending user")?
+ .ok_or_else(sender_not_found)?;
+
+ // Ordering: don't bother allocating a sequence number before we know the channel might
+ // exist.
+ let sent = tx
+ .sequence()
+ .next(sent_at)
+ .await
+ .fail("Failed to find event sequence number")?;
+ let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
+ let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
+ let message = History::begin(&conversation, &sender, body, sent);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = message.events().filter(Sequence::start_from(sent));
+ tx.messages()
+ .record_events(events.clone())
+ .await
+ .fail("Failed to store events")?;
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
+
+ self.events.broadcast_from(events.clone());
+ for event in events {
+ let failures = self
+ .publisher
+ .publish(event, &signer, &push_recipients)
+ .await
+ .fail("Failed to publish push events")?;
+
+ if !failures.is_empty() {
+ let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
+ // Note that data integrity guarantees from the original transaction to read
+ // subscriptions may no longer be valid now. Time has passed. Depending on how slow
+ // delivering push notifications is, potentially a _lot_ of time has passed.
+
+ for (sub, err) in failures {
+ match err {
+ // I _think_ this is the complete set of permanent failures. See
+ // <https://docs.rs/web-push/latest/web_push/enum.WebPushError.html> for a complete
+ // list.
+ WebPushError::Unauthorized(_)
+ | WebPushError::InvalidUri
+ | WebPushError::EndpointNotValid(_)
+ | WebPushError::EndpointNotFound(_)
+ | WebPushError::InvalidCryptoKeys
+ | WebPushError::MissingCryptoKeys => {
+ tx.push().unsubscribe(sub).await.fail(
+ "Failed to unsubscribe after permanent push message rejection",
+ )?;
+ }
+ _ => (),
+ }
+ }
+
+ tx.commit().await.fail(db::failed::COMMIT)?;
+ }
+ }
+
+ Ok(message.as_sent())
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("conversation {0} not found")]