summaryrefslogtreecommitdiff
path: root/src/channel/repo/broadcast.rs
blob: 3ca739644a699f3ecb7b1e3410d1a527574d15c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};

use crate::{
    clock::DateTime,
    repo::{
        channel,
        login::{self, Login, Logins},
        message,
    },
};

pub trait Provider {
    fn broadcast(&mut self) -> Broadcast;
}

impl<'c> Provider for Transaction<'c, Sqlite> {
    fn broadcast(&mut self) -> Broadcast {
        Broadcast(self)
    }
}

pub struct Broadcast<'t>(&'t mut SqliteConnection);

#[derive(Clone, Debug, serde::Serialize)]
pub struct Message {
    pub id: message::Id,
    pub sender: Login,
    pub body: String,
    pub sent_at: DateTime,
}

impl<'c> Broadcast<'c> {
    pub async fn create(
        &mut self,
        sender: &login::Id,
        channel: &channel::Id,
        body: &str,
        sent_at: &DateTime,
    ) -> Result<Message, sqlx::Error> {
        let id = message::Id::generate();

        let sender = Logins::from(&mut *self.0).by_id(sender).await?;

        let message = sqlx::query!(
            r#"
				insert into message
					(id, sender, channel, body, sent_at)
				values ($1, $2, $3, $4, $5)
				returning
					id as "id: message::Id",
					sender as "sender: login::Id",
					body,
					sent_at as "sent_at: DateTime"
			"#,
            id,
            sender.id,
            channel,
            body,
            sent_at,
        )
        .map(|row| {
            debug_assert!(row.sender == sender.id);
            Message {
                id: row.id,
                sender: sender.clone(),
                body: row.body,
                sent_at: row.sent_at,
            }
        })
        .fetch_one(&mut *self.0)
        .await?;

        Ok(message)
    }

    pub async fn replay(
        &mut self,
        channel: &channel::Id,
        resume_at: Option<&DateTime>,
    ) -> Result<Vec<Message>, sqlx::Error> {
        let messages = sqlx::query!(
            r#"
				select
					message.id as "id: message::Id",
					login.id as "sender_id: login::Id",
					login.name as sender_name,
					message.body,
					message.sent_at as "sent_at: DateTime"
				from message
					join login on message.sender = login.id
				where channel = $1
					and coalesce(sent_at > $2, true)
				order by sent_at asc
			"#,
            channel,
            resume_at,
        )
        .map(|row| Message {
            id: row.id,
            sender: Login {
                id: row.sender_id,
                name: row.sender_name,
            },
            body: row.body,
            sent_at: row.sent_at,
        })
        .fetch_all(&mut *self.0)
        .await?;

        Ok(messages)
    }
}