use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use web_push::WebPushError;
use super::{Body, History, Id, Message, history, repo::Provider as _};
use crate::{
clock::DateTime,
conversation::{self, repo::Provider as _},
db,
db::NotFound as _,
error::failed::{Failed, ResultExt as _},
event::{Broadcaster, Sequence, repo::Provider as _},
login::Login,
push::{Publish, repo::Provider as _},
user::{self, repo::Provider as _},
vapid::repo::Provider as _,
};
pub struct Messages
{
db: SqlitePool,
events: Broadcaster,
publisher: P,
}
impl
Messages
{
pub const fn new(db: SqlitePool, events: Broadcaster, publisher: P) -> Self {
Self {
db,
events,
publisher,
}
}
pub async fn delete(
&self,
deleted_by: &Login,
message: &Id,
deleted_at: &DateTime,
) -> Result<(), DeleteError> {
let message_not_found = || DeleteError::MessageNotFound(message.clone());
let not_sender = || DeleteError::NotSender(deleted_by.id.clone().into());
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let message = tx
.messages()
.by_id(message)
.await
.optional()
.fail("Failed to load message")?
.ok_or_else(message_not_found)?;
if message.sender() == &deleted_by.id {
let deleted_at = tx
.sequence()
.next(deleted_at)
.await
.fail("Failed to find event sequence number")?;
let message = message.delete(deleted_at)?;
let events = message.events().filter(Sequence::start_from(deleted_at));
tx.messages()
.record_events(events.clone())
.await
.fail("Failed to store events")?;
tx.commit().await.fail(db::failed::COMMIT)?;
self.events.broadcast_from(events);
Ok(())
} else {
Err(not_sender())
}
}
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 30 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(30);
let mut tx = self.db.begin().await?;
let expired = tx.messages().expired(&expire_at).await?;
let mut events = Vec::with_capacity(expired.len());
for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
match message.delete(deleted) {
Ok(message) => {
let message_events = message
.events()
.filter(Sequence::start_from(deleted.sequence));
tx.messages().record_events(message_events.clone()).await?;
events.push(message_events);
}
Err(history::DeleteError::Deleted(_)) => {}
}
}
tx.commit().await?;
self.events
.broadcast_from(events.into_iter().kmerge_by(Sequence::merge));
Ok(())
}
pub async fn purge(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, purge after 6 hours.
let purge_at = relative_to.to_owned() - TimeDelta::hours(6);
let mut tx = self.db.begin().await?;
tx.messages().purge(&purge_at).await?;
tx.commit().await?;
Ok(())
}
}
impl
Messages
where
P: Publish,
{
pub async fn send(
&self,
conversation: &conversation::Id,
sender: &Login,
sent_at: &DateTime,
body: &Body,
) -> Result {
let conversation_not_found = || SendError::ConversationNotFound(conversation.clone());
let conversation_deleted = || SendError::ConversationDeleted(conversation.clone());
let sender_not_found = || SendError::SenderNotFound(sender.id.clone().into());
let sender_deleted = || SendError::SenderDeleted(sender.id.clone().into());
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
let signer = tx
.vapid()
.signer()
.await
.fail("Failed to load VAPID signer")?;
let push_recipients = tx
.push()
.broadcast_from(sender)
.await
.fail("Failed to load push recipients")?;
let conversation = tx
.conversations()
.by_id(conversation)
.await
.optional()
.fail("Failed to load conversation")?
.ok_or_else(conversation_not_found)?;
let sender = tx
.users()
.by_login(sender)
.await
.optional()
.fail("Failed to load sending user")?
.ok_or_else(sender_not_found)?;
// Ordering: don't bother allocating a sequence number before we know the channel might
// exist.
let sent = tx
.sequence()
.next(sent_at)
.await
.fail("Failed to find event sequence number")?;
let conversation = conversation.as_of(sent).ok_or_else(conversation_deleted)?;
let sender = sender.as_of(sent).ok_or_else(sender_deleted)?;
let message = History::begin(&conversation, &sender, body, sent);
// This filter technically includes every event in the history, but it's easier to follow if
// the various event-manipulating app methods are consistent, and it's harmless to have an
// always-satisfied filter.
let events = message.events().filter(Sequence::start_from(sent));
tx.messages()
.record_events(events.clone())
.await
.fail("Failed to store events")?;
tx.commit().await.fail(db::failed::COMMIT)?;
self.events.broadcast_from(events.clone());
for event in events {
let failures = self
.publisher
.publish(event, &signer, &push_recipients)
.await
.fail("Failed to publish push events")?;
if !failures.is_empty() {
let mut tx = self.db.begin().await.fail(db::failed::BEGIN)?;
// Note that data integrity guarantees from the original transaction to read
// subscriptions may no longer be valid now. Time has passed. Depending on how slow
// delivering push notifications is, potentially a _lot_ of time has passed.
for (sub, err) in failures {
match err {
// I _think_ this is the complete set of permanent failures. See
// for a complete
// list.
WebPushError::Unauthorized(_)
| WebPushError::InvalidUri
| WebPushError::EndpointNotValid(_)
| WebPushError::EndpointNotFound(_)
| WebPushError::InvalidCryptoKeys
| WebPushError::MissingCryptoKeys => {
tx.push().unsubscribe(sub).await.fail(
"Failed to unsubscribe after permanent push message rejection",
)?;
}
_ => (),
}
}
tx.commit().await.fail(db::failed::COMMIT)?;
}
}
Ok(message.as_sent())
}
}
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("conversation {0} not found")]
ConversationNotFound(conversation::Id),
#[error("conversation {0} deleted")]
ConversationDeleted(conversation::Id),
#[error("user {0} not found")]
SenderNotFound(user::Id),
#[error("user {0} deleted")]
SenderDeleted(user::Id),
#[error(transparent)]
Failed(#[from] Failed),
}
#[derive(Debug, thiserror::Error)]
pub enum DeleteError {
#[error("message {0} not found")]
MessageNotFound(Id),
#[error("user {0} not the message's sender")]
NotSender(user::Id),
#[error("message {0} deleted")]
Deleted(Id),
#[error(transparent)]
Failed(#[from] Failed),
}
impl From for DeleteError {
fn from(error: history::DeleteError) -> Self {
match error {
history::DeleteError::Deleted(message) => Self::Deleted(message.id().clone()),
}
}
}