1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
use futures::{
Stream, future,
stream::{self, StreamExt as _},
};
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster};
use crate::{
conversation::{self, repo::Provider as _},
db::NotFound,
message::{self, repo::Provider as _},
name,
user::{self, repo::Provider as _},
vapid,
vapid::repo::Provider as _,
};
pub struct Events {
db: SqlitePool,
events: Broadcaster,
}
impl Events {
pub const fn new(db: SqlitePool, events: Broadcaster) -> Self {
Self { db, events }
}
pub async fn subscribe(
&self,
resume_at: Sequence,
) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
let mut tx = self.db.begin().await?;
let users = tx.users().replay(resume_at).await?;
let user_events = users
.iter()
.map(user::History::events)
.kmerge_by(Sequence::merge)
.filter(Sequence::after(resume_at))
.map(Event::from);
let conversations = tx.conversations().replay(resume_at).await?;
let conversation_events = conversations
.iter()
.map(conversation::History::events)
.kmerge_by(Sequence::merge)
.filter(Sequence::after(resume_at))
.map(Event::from);
let messages = tx.messages().replay(resume_at).await?;
let message_events = messages
.iter()
.map(message::History::events)
.kmerge_by(Sequence::merge)
.filter(Sequence::after(resume_at))
.map(Event::from);
let vapid = tx.vapid().current().await.optional()?;
let vapid_events = vapid
.iter()
.flat_map(vapid::History::events)
.filter(Sequence::after(resume_at))
.map(Event::from);
let replay_events = user_events
.merge_by(conversation_events, Sequence::merge)
.merge_by(message_events, Sequence::merge)
.merge_by(vapid_events, Sequence::merge)
.collect::<Vec<_>>();
let resume_live_at = replay_events.last().map_or(resume_at, Sequenced::sequence);
let replay = stream::iter(replay_events);
let live_messages = live_messages
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// `replay_events`.
.flat_map(stream::iter)
.filter(Self::resume(resume_live_at));
Ok(replay.chain(live_messages))
}
fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> + use<> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub enum Error {
Database(#[from] sqlx::Error),
Name(#[from] name::Error),
Ecdsa(#[from] p256::ecdsa::Error),
Pkcs8(#[from] p256::pkcs8::Error),
WebPush(#[from] web_push::WebPushError),
}
impl From<user::repo::LoadError> for Error {
fn from(error: user::repo::LoadError) -> Self {
use user::repo::LoadError;
match error {
LoadError::Database(error) => error.into(),
LoadError::Name(error) => error.into(),
}
}
}
impl From<conversation::repo::LoadError> for Error {
fn from(error: conversation::repo::LoadError) -> Self {
use conversation::repo::LoadError;
match error {
LoadError::Database(error) => error.into(),
LoadError::Name(error) => error.into(),
}
}
}
impl From<vapid::repo::Error> for Error {
fn from(error: vapid::repo::Error) -> Self {
use vapid::repo::Error;
match error {
Error::Database(error) => error.into(),
Error::Ecdsa(error) => error.into(),
Error::Pkcs8(error) => error.into(),
Error::WebPush(error) => error.into(),
}
}
}
|