diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/boot/app.rs | 42 | ||||
| -rw-r--r-- | src/boot/mod.rs | 63 | ||||
| -rw-r--r-- | src/message/repo.rs | 35 |
3 files changed, 51 insertions, 89 deletions
diff --git a/src/boot/app.rs b/src/boot/app.rs index 03e7230..ef48b2f 100644 --- a/src/boot/app.rs +++ b/src/boot/app.rs @@ -1,6 +1,6 @@ use sqlx::sqlite::SqlitePool; -use super::{Channel, Snapshot}; +use super::Snapshot; use crate::{ channel::repo::Provider as _, event::repo::Provider as _, login::repo::Provider as _, message::repo::Provider as _, @@ -20,43 +20,31 @@ impl<'a> Boot<'a> { let resume_point = tx.sequence().current().await?; let logins = tx.logins().all(resume_point.into()).await?; + let channels = tx.channels().all(resume_point.into()).await?; + let messages = tx.messages().all(resume_point.into()).await?; + + tx.commit().await?; + let logins = logins .into_iter() .filter_map(|login| login.as_of(resume_point)) .collect(); - let channels = tx.channels().all(resume_point.into()).await?; - let channels = { - let mut snapshots = Vec::with_capacity(channels.len()); - - let channels = channels.into_iter().filter_map(|channel| { - channel - .as_of(resume_point) - .map(|snapshot| (channel, snapshot)) - }); - - for (channel, snapshot) in channels { - let messages = tx - .messages() - .in_channel(&channel, resume_point.into()) - .await?; - - let messages = messages - .into_iter() - .filter_map(|message| message.as_of(resume_point)); - - snapshots.push(Channel::new(snapshot, messages)); - } - - snapshots - }; + let channels = channels + .into_iter() + .filter_map(|channel| channel.as_of(resume_point)) + .collect(); - tx.commit().await?; + let messages = messages + .into_iter() + .filter_map(|message| message.as_of(resume_point)) + .collect(); Ok(Snapshot { resume_point, logins, channels, + messages, }) } } diff --git a/src/boot/mod.rs b/src/boot/mod.rs index 1f94106..ed4764a 100644 --- a/src/boot/mod.rs +++ b/src/boot/mod.rs @@ -1,12 +1,7 @@ pub mod app; mod routes; -use crate::{ - channel, - event::{Instant, Sequence}, - login::{self, Login}, - message, -}; +use crate::{channel::Channel, event::Sequence, login::Login, message::Message}; pub use self::routes::router; @@ -15,61 +10,5 @@ pub struct Snapshot { pub resume_point: Sequence, pub logins: Vec<Login>, pub channels: Vec<Channel>, -} - -#[derive(serde::Serialize)] -pub struct Channel { - pub id: channel::Id, - pub name: String, pub messages: Vec<Message>, } - -impl Channel { - fn new( - channel: channel::Channel, - messages: impl IntoIterator<Item = message::Message>, - ) -> Self { - // The declarations are like this to guarantee that we aren't omitting any important fields from the corresponding types. - let channel::Channel { id, name } = channel; - - Self { - id, - name, - messages: messages.into_iter().map(Message::from).collect(), - } - } -} - -#[derive(serde::Serialize)] -pub struct Message { - #[serde(flatten)] - pub sent: Instant, - pub sender: login::Id, - pub id: message::Id, - pub body: String, -} - -impl From<message::Message> for Message { - fn from(message: message::Message) -> Self { - let message::Message { - sent, - channel: _, - sender, - id, - body, - } = message; - - Self { - sent, - sender, - id, - body, - } - } -} - -#[derive(serde::Serialize)] -pub struct Body { - id: message::Id, - body: String, -} diff --git a/src/message/repo.rs b/src/message/repo.rs index 0560f4a..71c6d10 100644 --- a/src/message/repo.rs +++ b/src/message/repo.rs @@ -112,6 +112,41 @@ impl<'c> Messages<'c> { Ok(messages) } + pub async fn all(&mut self, resume_at: ResumePoint) -> Result<Vec<History>, sqlx::Error> { + let messages = sqlx::query!( + r#" + select + channel as "channel: channel::Id", + sender as "sender: login::Id", + id as "id: Id", + body, + sent_at as "sent_at: DateTime", + sent_sequence as "sent_sequence: Sequence" + from message + where coalesce(sent_sequence <= $2, true) + order by sent_sequence + "#, + resume_at, + ) + .map(|row| History { + message: Message { + sent: Instant { + at: row.sent_at, + sequence: row.sent_sequence, + }, + channel: row.channel, + sender: row.sender, + id: row.id, + body: row.body, + }, + deleted: None, + }) + .fetch_all(&mut *self.0) + .await?; + + Ok(messages) + } + async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> { let message = sqlx::query!( r#" |
