use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite}; use super::{Event, History, Id, User, event::Created}; use crate::{ clock::DateTime, db::NotFound, event::{Instant, Sequence}, login::Login, name::{self, Name}, }; pub trait Provider { fn users(&mut self) -> Users<'_>; } impl Provider for Transaction<'_, Sqlite> { fn users(&mut self) -> Users<'_> { Users(self) } } pub struct Users<'t>(&'t mut SqliteConnection); impl Users<'_> { pub async fn record_events( &mut self, events: impl IntoIterator, ) -> Result<(), sqlx::Error> { for event in events { self.record_event(&event).await?; } Ok(()) } pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> { match event { Event::Created(created) => self.record_created(created).await, } } async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> { let Created { user, instant } = created; sqlx::query!( r#" insert into user (id, created_at, created_sequence) values ($1, $2, $3) "#, user.id, instant.at, instant.sequence, ) .execute(&mut *self.0) .await?; Ok(()) } pub async fn by_login(&mut self, login: &Login) -> Result { let user = sqlx::query!( r#" select id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", user.created_at as "created_at: DateTime", user.created_sequence as "created_sequence: Sequence" from user join login using (id) where id = $1 "#, login.id, ) .map(|row| { Ok::<_, LoadError>(History { user: User { id: row.id, name: Name::new(row.display_name, row.canonical_name)?, }, created: Instant::new(row.created_at, row.created_sequence), }) }) .fetch_one(&mut *self.0) .await??; Ok(user) } pub async fn all(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", user.created_at as "created_at: DateTime", user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence <= $1 "#, resume_at, ) .map(|row| { Ok::<_, LoadError>(History { user: User { id: row.id, name: Name::new(row.display_name, row.canonical_name)?, }, created: Instant::new(row.created_at, row.created_sequence), }) }) .fetch(&mut *self.0) .map(|res| res?) .try_collect() .await?; Ok(logins) } pub async fn replay(&mut self, resume_at: Sequence) -> Result, LoadError> { let logins = sqlx::query!( r#" select id as "id: Id", login.display_name as "display_name: String", login.canonical_name as "canonical_name: String", user.created_at as "created_at: DateTime", user.created_sequence as "created_sequence: Sequence" from user join login using (id) where user.created_sequence > $1 "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { user: User { id: row.id, name: Name::new(row.display_name, row.canonical_name)?, }, created: Instant::new(row.created_at, row.created_sequence), }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(logins) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum LoadError { Database(#[from] sqlx::Error), Name(#[from] name::Error), } impl NotFound for Result { type Ok = T; type Error = LoadError; fn optional(self) -> Result, LoadError> { match self { Ok(value) => Ok(Some(value)), Err(LoadError::Database(sqlx::Error::RowNotFound)) => Ok(None), Err(other) => Err(other), } } }