diff options
Diffstat (limited to 'src/channel/repo')
| -rw-r--r-- | src/channel/repo/broadcast.rs | 112 | ||||
| -rw-r--r-- | src/channel/repo/channels.rs | 102 | ||||
| -rw-r--r-- | src/channel/repo/messages.rs | 136 | ||||
| -rw-r--r-- | src/channel/repo/mod.rs | 3 |
4 files changed, 113 insertions, 240 deletions
diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs new file mode 100644 index 0000000..3ca7396 --- /dev/null +++ b/src/channel/repo/broadcast.rs @@ -0,0 +1,112 @@ +use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; + +use crate::{ + clock::DateTime, + repo::{ + channel, + login::{self, Login, Logins}, + message, + }, +}; + +pub trait Provider { + fn broadcast(&mut self) -> Broadcast; +} + +impl<'c> Provider for Transaction<'c, Sqlite> { + fn broadcast(&mut self) -> Broadcast { + Broadcast(self) + } +} + +pub struct Broadcast<'t>(&'t mut SqliteConnection); + +#[derive(Clone, Debug, serde::Serialize)] +pub struct Message { + pub id: message::Id, + pub sender: Login, + pub body: String, + pub sent_at: DateTime, +} + +impl<'c> Broadcast<'c> { + pub async fn create( + &mut self, + sender: &login::Id, + channel: &channel::Id, + body: &str, + sent_at: &DateTime, + ) -> Result<Message, sqlx::Error> { + let id = message::Id::generate(); + + let sender = Logins::from(&mut *self.0).by_id(sender).await?; + + let message = sqlx::query!( + r#" + insert into message + (id, sender, channel, body, sent_at) + values ($1, $2, $3, $4, $5) + returning + id as "id: message::Id", + sender as "sender: login::Id", + body, + sent_at as "sent_at: DateTime" + "#, + id, + sender.id, + channel, + body, + sent_at, + ) + .map(|row| { + debug_assert!(row.sender == sender.id); + Message { + id: row.id, + sender: sender.clone(), + body: row.body, + sent_at: row.sent_at, + } + }) + .fetch_one(&mut *self.0) + .await?; + + Ok(message) + } + + pub async fn replay( + &mut self, + channel: &channel::Id, + resume_at: Option<&DateTime>, + ) -> Result<Vec<Message>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + message.id as "id: message::Id", + login.id as "sender_id: login::Id", + login.name as sender_name, + message.body, + message.sent_at as "sent_at: DateTime" + from message + join login on message.sender = login.id + where channel = $1 + and coalesce(sent_at > $2, true) + order by sent_at asc + "#, + channel, + resume_at, + ) + .map(|row| Message { + id: row.id, + sender: Login { + id: row.sender_id, + name: row.sender_name, + }, + body: row.body, + sent_at: row.sent_at, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } +} diff --git a/src/channel/repo/channels.rs b/src/channel/repo/channels.rs deleted file mode 100644 index ab7489c..0000000 --- a/src/channel/repo/channels.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::fmt; - -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use crate::id::Id as BaseId; - -pub trait Provider { - fn channels(&mut self) -> Channels; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn channels(&mut self) -> Channels { - Channels(self) - } -} - -pub struct Channels<'t>(&'t mut SqliteConnection); - -#[derive(Debug)] -pub struct Channel { - pub id: Id, - pub name: String, -} - -impl<'c> Channels<'c> { - /// Create a new channel. - pub async fn create(&mut self, name: &str) -> Result<Id, sqlx::Error> { - let id = Id::generate(); - - let channel = sqlx::query_scalar!( - r#" - insert - into channel (id, name) - values ($1, $2) - returning id as "id: Id" - "#, - id, - name, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn by_id(&mut self, channel: Id) -> Result<Channel, sqlx::Error> { - let channel = sqlx::query_as!( - Channel, - r#" - select id as "id: Id", name - from channel - where id = $1 - "#, - channel, - ) - .fetch_one(&mut *self.0) - .await?; - - Ok(channel) - } - - pub async fn all(&mut self) -> Result<Vec<Channel>, sqlx::Error> { - let channels = sqlx::query_as!( - Channel, - r#" - select - channel.id as "id: Id", - channel.name - from channel - order by channel.name - "#, - ) - .fetch_all(&mut *self.0) - .await?; - - Ok(channels) - } -} - -/// Stable identifier for a [Channel]. Prefixed with `C`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From<BaseId> for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("C") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/repo/messages.rs b/src/channel/repo/messages.rs deleted file mode 100644 index a30e6da..0000000 --- a/src/channel/repo/messages.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::fmt; - -use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; - -use super::channels::Id as ChannelId; -use crate::{ - clock::DateTime, - id::Id as BaseId, - login::repo::logins::{Id as LoginId, Login, Logins}, -}; - -pub trait Provider { - fn messages(&mut self) -> Messages; -} - -impl<'c> Provider for Transaction<'c, Sqlite> { - fn messages(&mut self) -> Messages { - Messages(self) - } -} - -pub struct Messages<'t>(&'t mut SqliteConnection); - -#[derive(Clone, Debug, serde::Serialize)] -pub struct BroadcastMessage { - pub id: Id, - pub sender: Login, - pub body: String, - pub sent_at: DateTime, -} - -impl<'c> Messages<'c> { - pub async fn create( - &mut self, - sender: &LoginId, - channel: &ChannelId, - body: &str, - sent_at: &DateTime, - ) -> Result<BroadcastMessage, sqlx::Error> { - let id = Id::generate(); - - let sender = Logins::from(&mut *self.0).by_id(sender).await?; - - let message = sqlx::query!( - r#" - insert into message - (id, sender, channel, body, sent_at) - values ($1, $2, $3, $4, $5) - returning - id as "id: Id", - sender as "sender: LoginId", - body, - sent_at as "sent_at: DateTime" - "#, - id, - sender.id, - channel, - body, - sent_at, - ) - .map(|row| { - debug_assert!(row.sender == sender.id); - BroadcastMessage { - id: row.id, - sender: sender.clone(), - body: row.body, - sent_at: row.sent_at, - } - }) - .fetch_one(&mut *self.0) - .await?; - - Ok(message) - } - - pub async fn for_replay( - &mut self, - channel: &ChannelId, - resume_at: Option<&DateTime>, - ) -> Result<Vec<BroadcastMessage>, sqlx::Error> { - let messages = sqlx::query!( - r#" - select - message.id as "id: Id", - login.id as "sender_id: LoginId", - login.name as sender_name, - message.body, - message.sent_at as "sent_at: DateTime" - from message - join login on message.sender = login.id - where channel = $1 - and coalesce(sent_at > $2, true) - order by sent_at asc - "#, - channel, - resume_at, - ) - .map(|row| BroadcastMessage { - id: row.id, - sender: Login { - id: row.sender_id, - name: row.sender_name, - }, - body: row.body, - sent_at: row.sent_at, - }) - .fetch_all(&mut *self.0) - .await?; - - Ok(messages) - } -} - -/// Stable identifier for a [Message]. Prefixed with `M`. -#[derive(Clone, Debug, Eq, Hash, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)] -#[sqlx(transparent)] -#[serde(transparent)] -pub struct Id(BaseId); - -impl From<BaseId> for Id { - fn from(id: BaseId) -> Self { - Self(id) - } -} - -impl Id { - pub fn generate() -> Self { - BaseId::generate("M") - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} diff --git a/src/channel/repo/mod.rs b/src/channel/repo/mod.rs index 345897d..2ed3062 100644 --- a/src/channel/repo/mod.rs +++ b/src/channel/repo/mod.rs @@ -1,2 +1 @@ -pub mod channels; -pub mod messages; +pub mod broadcast; |
