summaryrefslogtreecommitdiff
path: root/src/events/app/events.rs
blob: a8814c9b82c0f34b8569aeddc0277766b6abae4c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use chrono::TimeDelta;
use futures::{
    future,
    stream::{self, StreamExt as _},
    Stream,
};
use sqlx::sqlite::SqlitePool;

use super::Broadcaster;
use crate::{
    clock::DateTime,
    events::repo::broadcast::{self, Provider as _},
    repo::{
        channel::{self, Provider as _},
        error::NotFound as _,
        login::Login,
    },
};

pub struct Events<'a> {
    db: &'a SqlitePool,
    broadcaster: &'a Broadcaster,
}

impl<'a> Events<'a> {
    pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
        Self { db, broadcaster }
    }

    pub async fn send(
        &self,
        login: &Login,
        channel: &channel::Id,
        body: &str,
        sent_at: &DateTime,
    ) -> Result<broadcast::Message, EventsError> {
        let mut tx = self.db.begin().await?;
        let channel = tx
            .channels()
            .by_id(channel)
            .await
            .not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
        let message = tx
            .broadcast()
            .create(login, &channel, body, sent_at)
            .await?;
        tx.commit().await?;

        self.broadcaster.broadcast(&channel.id, &message);
        Ok(message)
    }

    pub async fn subscribe(
        &self,
        channel: &channel::Id,
        subscribed_at: &DateTime,
        resume_at: Option<broadcast::Sequence>,
    ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> {
        // Somewhat arbitrarily, expire after 90 days.
        let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);

        let mut tx = self.db.begin().await?;
        let channel = tx
            .channels()
            .by_id(channel)
            .await
            .not_found(|| EventsError::ChannelNotFound(channel.clone()))?;

        // Subscribe before retrieving, to catch messages broadcast while we're
        // querying the DB. We'll prune out duplicates later.
        let live_messages = self.broadcaster.subscribe(&channel.id);

        tx.broadcast().expire(&expire_at).await?;
        let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
        tx.commit().await?;

        let resume_broadcast_at = stored_messages
            .last()
            .map(|message| message.sequence)
            .or(resume_at);

        // This should always be the case, up to integer rollover, primarily
        // because every message in stored_messages has a sequence not less
        // than `resume_at`, or `resume_at` is None. We use the last message
        // (if any) to decide when to resume the `live_messages` stream.
        //
        // It probably simplifies to assert!(resume_at <= resume_broadcast_at), but
        // this form captures more of the reasoning.
        assert!(
            (resume_at.is_none() && resume_broadcast_at.is_none())
                || (stored_messages.is_empty() && resume_at == resume_broadcast_at)
                || resume_at < resume_broadcast_at
        );

        // no skip_expired or resume transforms for stored_messages, as it's
        // constructed not to contain messages meeting either criterion.
        //
        // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
        // * resume is redundant with the resume_at argument to
        //   `tx.broadcasts().replay(…)`.
        let stored_messages = stream::iter(stored_messages);
        let live_messages = live_messages
            // Sure, it's temporally improbable that we'll ever skip a message
            // that's 90 days old, but there's no reason not to be thorough.
            .filter(Self::skip_expired(&expire_at))
            // Filtering on the broadcast resume point filters out messages
            // before resume_at, and filters out messages duplicated from
            // stored_messages.
            .filter(Self::resume(resume_broadcast_at));

        Ok(stored_messages.chain(live_messages))
    }

    fn resume(
        resume_at: Option<broadcast::Sequence>,
    ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
        move |msg| {
            future::ready(match resume_at {
                None => true,
                Some(resume_at) => msg.sequence > resume_at,
            })
        }
    }
    fn skip_expired(
        expire_at: &DateTime,
    ) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<bool> {
        let expire_at = expire_at.to_owned();
        move |msg| future::ready(msg.sent_at > expire_at)
    }
}

#[derive(Debug, thiserror::Error)]
pub enum EventsError {
    #[error("channel {0} not found")]
    ChannelNotFound(channel::Id),
    #[error(transparent)]
    DatabaseError(#[from] sqlx::Error),
}