1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
use chrono::TimeDelta;
use futures::{
future,
stream::{self, StreamExt as _},
Stream,
};
use sqlx::sqlite::SqlitePool;
use crate::{
clock::DateTime,
events::{
app::Broadcaster,
repo::broadcast::{self, Provider as _},
},
repo::{
channel::{self, Channel, Provider as _},
error::NotFound as _,
login::Login,
},
};
pub struct Channels<'a> {
db: &'a SqlitePool,
broadcaster: &'a Broadcaster,
}
impl<'a> Channels<'a> {
pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
Self { db, broadcaster }
}
pub async fn create(&self, name: &str) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.create(name)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
self.broadcaster.register_channel(&channel.id);
tx.commit().await?;
Ok(channel)
}
pub async fn all(&self) -> Result<Vec<Channel>, InternalError> {
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
tx.commit().await?;
Ok(channels)
}
pub async fn send(
&self,
login: &Login,
channel: &channel::Id,
body: &str,
sent_at: &DateTime,
) -> Result<broadcast::Message, EventsError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
let message = tx
.broadcast()
.create(login, &channel, body, sent_at)
.await?;
tx.commit().await?;
self.broadcaster.broadcast(&channel.id, &message);
Ok(message)
}
pub async fn events(
&self,
channel: &channel::Id,
subscribed_at: &DateTime,
resume_at: Option<broadcast::Sequence>,
) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
.not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.listen(&channel.id);
tx.broadcast().expire(&expire_at).await?;
let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
let resume_broadcast_at = stored_messages
.last()
.map(|message| message.sequence)
.or(resume_at);
// This should always be the case, up to integer rollover, primarily
// because every message in stored_messages has a sequence not less
// than `resume_at`, or `resume_at` is None. We use the last message
// (if any) to decide when to resume the `live_messages` stream.
//
// It probably simplifies to assert!(resume_at <= resume_broadcast_at), but
// this form captures more of the reasoning.
assert!(
(resume_at.is_none() && resume_broadcast_at.is_none())
|| (stored_messages.is_empty() && resume_at == resume_broadcast_at)
|| resume_at < resume_broadcast_at
);
// no skip_expired or resume transforms for stored_messages, as it's
// constructed not to contain messages meeting either criterion.
//
// * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let stored_messages = stream::iter(stored_messages);
let live_messages = live_messages
// Sure, it's temporally improbable that we'll ever skip a message
// that's 90 days old, but there's no reason not to be thorough.
.filter(Self::skip_expired(&expire_at))
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
.filter(Self::resume(resume_broadcast_at));
Ok(stored_messages.chain(live_messages))
}
fn resume(
resume_at: Option<broadcast::Sequence>,
) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
move |msg| {
future::ready(match resume_at {
None => true,
Some(resume_at) => msg.sequence > resume_at,
})
}
}
fn skip_expired(
expire_at: &DateTime,
) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
let expire_at = expire_at.to_owned();
move |msg| future::ready(msg.sent_at > expire_at)
}
}
#[derive(Debug, thiserror::Error)]
pub enum CreateError {
#[error("channel named {0} already exists")]
DuplicateName(String),
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {
if error.is_unique_violation() {
return Self::DuplicateName(name.into());
}
}
Self::from(error)
}
}
#[derive(Debug, thiserror::Error)]
pub enum InternalError {
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum EventsError {
#[error("channel {0} not found")]
ChannelNotFound(channel::Id),
#[error(transparent)]
ResumeAtError(#[from] chrono::ParseError),
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
|