summaryrefslogtreecommitdiff
path: root/src/events/app.rs
blob: 134e86a94cb6c1dcd699236196ce22f8f1c23a37 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use std::collections::BTreeMap;

use chrono::TimeDelta;
use futures::{
    future,
    stream::{self, StreamExt as _},
    Stream,
};
use sqlx::sqlite::SqlitePool;

use super::{
    broadcaster::Broadcaster,
    repo::message::Provider as _,
    types::{self, ChannelEvent, ResumePoint},
};
use crate::{
    clock::DateTime,
    repo::{
        channel::{self, Provider as _},
        error::NotFound as _,
        login::Login,
    },
};

pub struct Events<'a> {
    db: &'a SqlitePool,
    broadcaster: &'a Broadcaster,
}

impl<'a> Events<'a> {
    pub const fn new(db: &'a SqlitePool, broadcaster: &'a Broadcaster) -> Self {
        Self { db, broadcaster }
    }

    pub async fn send(
        &self,
        login: &Login,
        channel: &channel::Id,
        body: &str,
        sent_at: &DateTime,
    ) -> Result<types::ChannelEvent, EventsError> {
        let mut tx = self.db.begin().await?;
        let channel = tx
            .channels()
            .by_id(channel)
            .await
            .not_found(|| EventsError::ChannelNotFound(channel.clone()))?;
        let event = tx
            .message_events()
            .create(login, &channel, body, sent_at)
            .await?;
        tx.commit().await?;

        self.broadcaster.broadcast(&event);
        Ok(event)
    }

    pub async fn subscribe(
        &self,
        subscribed_at: &DateTime,
        resume_at: ResumePoint,
    ) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
        // Somewhat arbitrarily, expire after 90 days.
        let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);

        let mut tx = self.db.begin().await?;
        let channels = tx.channels().all().await?;

        let created_events = {
            let resume_at = resume_at.clone();
            let channels = channels.clone();
            stream::iter(
                channels
                    .into_iter()
                    .map(ChannelEvent::created)
                    .filter(move |event| resume_at.not_after(event)),
            )
        };

        // Subscribe before retrieving, to catch messages broadcast while we're
        // querying the DB. We'll prune out duplicates later.
        let live_messages = self.broadcaster.subscribe();

        tx.message_events().expire(&expire_at).await?;

        let mut replays = BTreeMap::new();
        let mut resume_live_at = resume_at.clone();
        for channel in channels {
            let replay = tx
                .message_events()
                .replay(&channel, resume_at.get(&channel.id))
                .await?;

            if let Some(last) = replay.last() {
                resume_live_at.advance(&channel.id, last.sequence);
            }

            replays.insert(channel.id.clone(), replay);
        }

        let replay = stream::select_all(replays.into_values().map(stream::iter));

        // no skip_expired or resume transforms for stored_messages, as it's
        // constructed not to contain messages meeting either criterion.
        //
        // * skip_expired is redundant with the `tx.broadcasts().expire(…)` call;
        // * resume is redundant with the resume_at argument to
        //   `tx.broadcasts().replay(…)`.
        let live_messages = live_messages
            // Sure, it's temporally improbable that we'll ever skip a message
            // that's 90 days old, but there's no reason not to be thorough.
            .filter(Self::skip_expired(&expire_at))
            // Filtering on the broadcast resume point filters out messages
            // before resume_at, and filters out messages duplicated from
            // stored_messages.
            .filter(Self::resume(resume_live_at));

        Ok(created_events.chain(replay).chain(live_messages).scan(
            resume_at,
            |resume_point, event| {
                let channel = &event.channel.id;
                let sequence = event.sequence;
                resume_point.advance(channel, sequence);

                let event = types::ResumableEvent(resume_point.clone(), event);

                future::ready(Some(event))
            },
        ))
    }

    fn resume(
        resume_at: ResumePoint,
    ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
        move |event| future::ready(resume_at.not_after(event))
    }
    fn skip_expired(
        expire_at: &DateTime,
    ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
        let expire_at = expire_at.to_owned();
        move |event| future::ready(expire_at < event.at)
    }
}

#[derive(Debug, thiserror::Error)]
pub enum EventsError {
    #[error("channel {0} not found")]
    ChannelNotFound(channel::Id),
    #[error(transparent)]
    DatabaseError(#[from] sqlx::Error),
}