use futures::stream::{StreamExt as _, TryStreamExt as _}; use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction}; use crate::{ clock::DateTime, event::{Instant, ResumePoint, Sequence}, login::{password::StoredHash, History, Id, Login}, name::{self, Name}, }; pub trait Provider { fn logins(&mut self) -> Logins; } impl<'c> Provider for Transaction<'c, Sqlite> { fn logins(&mut self) -> Logins { Logins(self) } } pub struct Logins<'t>(&'t mut SqliteConnection); impl<'c> Logins<'c> { pub async fn create( &mut self, name: &Name, password_hash: &StoredHash, created: &Instant, ) -> Result { let id = Id::generate(); let display_name = name.display(); let canonical_name = name.canonical(); sqlx::query!( r#" insert into login (id, display_name, canonical_name, password_hash, created_sequence, created_at) values ($1, $2, $3, $4, $5, $6) "#, id, display_name, canonical_name, password_hash, created.sequence, created.at, ) .execute(&mut *self.0) .await?; let login = History { created: *created, login: Login { id, name: name.clone(), }, }; Ok(login) } pub async fn all(&mut self, resume_at: ResumePoint) -> Result, LoadError> { let logins = sqlx::query!( r#" select id as "id: Id", display_name as "display_name: String", canonical_name as "canonical_name: String", created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login where coalesce(created_sequence <= $1, true) order by created_sequence "#, resume_at, ) .map(|row| { Ok::<_, LoadError>(History { login: Login { id: row.id, name: Name::new(row.display_name, row.canonical_name)?, }, created: Instant::new(row.created_at, row.created_sequence), }) }) .fetch(&mut *self.0) .map(|res| res?) .try_collect() .await?; Ok(logins) } pub async fn replay(&mut self, resume_at: ResumePoint) -> Result, LoadError> { let logins = sqlx::query!( r#" select id as "id: Id", display_name as "display_name: String", canonical_name as "canonical_name: String", created_sequence as "created_sequence: Sequence", created_at as "created_at: DateTime" from login where coalesce(login.created_sequence > $1, true) "#, resume_at, ) .map(|row| { Ok::<_, name::Error>(History { login: Login { id: row.id, name: Name::new(row.display_name, row.canonical_name)?, }, created: Instant::new(row.created_at, row.created_sequence), }) }) .fetch(&mut *self.0) .map(|res| Ok::<_, LoadError>(res??)) .try_collect() .await?; Ok(logins) } pub async fn recanonicalize(&mut self) -> Result<(), sqlx::Error> { let logins = sqlx::query!( r#" select id as "id: Id", display_name as "display_name: String" from login "#, ) .fetch_all(&mut *self.0) .await?; for login in logins { let name = Name::from(login.display_name); let canonical_name = name.canonical(); sqlx::query!( r#" update login set canonical_name = $1 where id = $2 "#, canonical_name, login.id, ) .execute(&mut *self.0) .await?; } Ok(()) } } #[derive(Debug, thiserror::Error)] #[error(transparent)] pub enum LoadError { Database(#[from] sqlx::Error), Name(#[from] name::Error), }