1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
Stream,
};
use sqlx::sqlite::SqlitePool;
use super::{
broadcaster::Broadcaster,
repo::message::Provider as _,
types::{self, ChannelEvent},
};
use crate::{
channel::{self, repo::Provider as _},
clock::DateTime,
db::NotFound as _,
event::{repo::Provider as _, Sequence},
login::Login,
};
pub struct Events<'a> {
db: &'a SqlitePool,
events: &'a Broadcaster,
}
impl<'a> Events<'a> {
pub const fn new(db: &'a SqlitePool, events: &'a Broadcaster) -> Self {
Self { db, events }
}
pub async fn send(
&self,
login: &Login,
channel: &channel::Id,
body: &str,
sent_at: &DateTime,
) -> Result<types::ChannelEvent, EventsError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
let sent = tx.sequence().next(sent_at).await?;
let event = tx
.message_events()
.create(login, &channel, &sent, body)
.await?;
tx.commit().await?;
self.events.broadcast(&event);
Ok(event)
}
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
let mut tx = self.db.begin().await?;
let expired = tx.message_events().expired(&expire_at).await?;
let mut events = Vec::with_capacity(expired.len());
for (channel, message) in expired {
let deleted = tx.sequence().next(relative_to).await?;
let event = tx
.message_events()
.delete(&channel, &message, &deleted)
.await?;
events.push(event);
}
tx.commit().await?;
for event in events {
self.events.broadcast(&event);
}
Ok(())
}
pub async fn subscribe(
&self,
resume_at: Option<Sequence>,
) -> Result<impl Stream<Item = types::ChannelEvent> + std::fmt::Debug, sqlx::Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
let mut tx = self.db.begin().await?;
let channels = tx.channels().replay(resume_at).await?;
let channel_events = channels
.into_iter()
.map(ChannelEvent::created)
.filter(move |event| {
resume_at.map_or(true, |resume_at| Sequence::from(event) > resume_at)
});
let message_events = tx.message_events().replay(resume_at).await?;
let mut replay_events = channel_events
.into_iter()
.chain(message_events.into_iter())
.collect::<Vec<_>>();
replay_events.sort_by_key(|event| Sequence::from(event));
let resume_live_at = replay_events.last().map(Sequence::from);
let replay = stream::iter(replay_events);
// no skip_expired or resume transforms for stored_messages, as it's
// constructed not to contain messages meeting either criterion.
//
// * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let live_messages = live_messages
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
.filter(Self::resume(resume_live_at));
Ok(replay.chain(live_messages))
}
fn resume(
resume_at: Option<Sequence>,
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
move |event| future::ready(resume_at < Some(Sequence::from(event)))
}
}
#[derive(Debug, thiserror::Error)]
pub enum EventsError {
#[error("channel {0} not found")]
ChannelNotFound(channel::Id),
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
|