diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/cli.rs | 15 | ||||
| -rw-r--r-- | src/db.rs | 42 | ||||
| -rw-r--r-- | src/db/backup.rs | 76 | ||||
| -rw-r--r-- | src/db/mod.rs | 114 | ||||
| -rw-r--r-- | src/test/fixtures/mod.rs | 2 |
5 files changed, 199 insertions, 50 deletions
@@ -49,6 +49,10 @@ pub struct Args { /// Sqlite URL or path for the `hi` database #[arg(short, long, env, default_value = "sqlite://.hi")] database_url: String, + + /// Sqlite URL or path for a backup of the `hi` database during upgrades + #[arg(short = 'D', long, env, default_value = "sqlite://.hi.backup")] + backup_database_url: String, } impl Args { @@ -99,8 +103,8 @@ impl Args { (self.address.as_str(), self.port) } - async fn pool(&self) -> sqlx::Result<SqlitePool> { - db::prepare(&self.database_url).await + async fn pool(&self) -> Result<SqlitePool, db::Error> { + db::prepare(&self.database_url, &self.backup_database_url).await } } @@ -127,9 +131,6 @@ fn started_msg(listener: &net::TcpListener) -> io::Result<String> { pub enum Error { /// Failure due to `io::Error`. See [`io::Error`]. IoError(#[from] io::Error), - /// Failure due to a database error. See [`sqlx::Error`]. - DatabaseError(#[from] sqlx::Error), - /// Failure due to a database migration error. See - /// [`sqlx::migrate::MigrateError`]. - MigrateError(#[from] sqlx::migrate::MigrateError), + /// Failure due to a database initialization error. See [`db::Error`]. + Database(#[from] db::Error), } diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index 93a1169..0000000 --- a/src/db.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::str::FromStr; - -use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; - -pub async fn prepare(url: &str) -> sqlx::Result<SqlitePool> { - let pool = create(url).await?; - sqlx::migrate!().run(&pool).await?; - Ok(pool) -} - -async fn create(database_url: &str) -> sqlx::Result<SqlitePool> { - let options = SqliteConnectOptions::from_str(database_url)? - .create_if_missing(true) - .optimize_on_close(true, /* analysis_limit */ None); - - let pool = SqlitePoolOptions::new().connect_with(options).await?; - Ok(pool) -} - -pub trait NotFound { - type Ok; - fn not_found<E, F>(self, map: F) -> Result<Self::Ok, E> - where - E: From<sqlx::Error>, - F: FnOnce() -> E; -} - -impl<T> NotFound for Result<T, sqlx::Error> { - type Ok = T; - - fn not_found<E, F>(self, map: F) -> Result<T, E> - where - E: From<sqlx::Error>, - F: FnOnce() -> E, - { - match self { - Err(sqlx::Error::RowNotFound) => Err(map()), - Err(other) => Err(other.into()), - Ok(value) => Ok(value), - } - } -} diff --git a/src/db/backup.rs b/src/db/backup.rs new file mode 100644 index 0000000..bb36aea --- /dev/null +++ b/src/db/backup.rs @@ -0,0 +1,76 @@ +use rusqlite::{ + backup::{self}, + Connection, +}; +use sqlx::sqlite::{LockedSqliteHandle, SqlitePool}; + +pub struct Builder<'p> { + from: &'p SqlitePool, +} + +impl<'p> Builder<'p> { + pub fn to(self, to: &'p SqlitePool) -> Backup<'p> { + Backup { + from: self.from, + to, + } + } +} + +pub struct Backup<'p> { + from: &'p SqlitePool, + to: &'p SqlitePool, +} + +impl<'p> Backup<'p> { + pub fn from(from: &'p SqlitePool) -> Builder<'p> { + Builder { from } + } +} + +impl<'p> Backup<'p> { + pub async fn backup(&mut self) -> Result<(), Error> { + let mut to = self.to.acquire().await?; + let mut to = Self::connection(&mut to.lock_handle().await?)?; + let mut from = self.from.acquire().await?; + let from = Self::connection(&mut from.lock_handle().await?)?; + + let backup = backup::Backup::new(&from, &mut to)?; + loop { + use rusqlite::backup::StepResult::{Busy, Done, Locked, More}; + match backup.step(-1)? { + Done => break Ok(()), + // In the system as actually written, yielding does nothing: this is the + // only task actually in flight at the time. However, that may change; if + // it does, we _definitely_ want to give other tasks an opportunity to + // make progress before we retry. + More => tokio::task::consume_budget().await, + // If another task is actually using the DB, then it _definitely_ needs + // an opportunity to make progress before we retry. + Busy | Locked => tokio::task::yield_now().await, + // As of this writing, this arm is formally unreachable: all of the enum + // constants that actually exist in rusqlite are matched above. However, + // they've reserved the prerogative to add additional arms without a + // breaking change by making the type as non-exhaustive; if we get one, + // stop immediately as there's no way to guess what the correct handling + // will be, a priori. + other => panic!("unexpected backup step result: {other:?}"), + } + } + } + + fn connection(handle: &mut LockedSqliteHandle) -> Result<Connection, rusqlite::Error> { + let handle = handle.as_raw_handle(); + let connection = unsafe { Connection::from_handle(handle.as_ptr()) }?; + + Ok(connection) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Sqlx(#[from] sqlx::Error), + #[error(transparent)] + Rusqlite(#[from] rusqlite::Error), +} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..bbaec7d --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,114 @@ +mod backup; + +use std::str::FromStr; + +use hex_literal::hex; +use sqlx::{ + migrate::{Migrate as _, MigrateDatabase as _}, + sqlite::{Sqlite, SqliteConnectOptions, SqlitePool, SqlitePoolOptions}, +}; + +pub async fn prepare(url: &str, backup_url: &str) -> Result<SqlitePool, Error> { + if backup_url != "sqlite::memory:" && Sqlite::database_exists(backup_url).await? { + return Err(Error::BackupExists(backup_url.into())); + } + + let pool = create(url).await?; + + // First migration of original migration series, from commit + // 9bd6d9862b1c243def02200bca2cfbf578ad2a2f or earlier. + reject_migration(&pool, "20240831024047", "login", &hex!("9949D238C4099295EC4BEE734BFDA8D87513B2973DFB895352A11AB01DD46CB95314B7F1B3431B77E3444A165FE3DC28")).await?; + + let backup_pool = create(backup_url).await?; + backup::Backup::from(&pool) + .to(&backup_pool) + .backup() + .await?; + + if let Err(migrate_error) = sqlx::migrate!().run(&pool).await { + if let Err(restore_error) = backup::Backup::from(&backup_pool).to(&pool).backup().await { + Err(Error::Restore(restore_error, migrate_error))?; + } else { + Err(migrate_error)?; + }; + } + + Sqlite::drop_database(backup_url).await?; + Ok(pool) +} + +async fn create(database_url: &str) -> sqlx::Result<SqlitePool> { + let options = SqliteConnectOptions::from_str(database_url)? + .create_if_missing(true) + .optimize_on_close(true, /* analysis_limit */ None); + + let pool = SqlitePoolOptions::new().connect_with(options).await?; + Ok(pool) +} + +async fn reject_migration( + pool: &SqlitePool, + version: &str, + description: &str, + checksum: &[u8], +) -> Result<(), Error> { + let mut conn = pool.acquire().await?; + conn.ensure_migrations_table().await?; + let applied = conn.list_applied_migrations().await?; + + for migration in applied { + if migration.checksum == checksum { + return Err(Error::Rejected(version.into(), description.into())); + } + } + + Ok(()) +} + +/// Errors occurring during database setup. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failure due to a database error. See [`sqlx::Error`]. + #[error(transparent)] + Database(#[from] sqlx::Error), + /// Failure because an existing database backup already exists. + #[error("backup from a previous failed migration already exists: {0}")] + BackupExists(String), + /// Failure due to a database backup error. See [`backup::Error`]. + #[error(transparent)] + Backup(#[from] backup::Error), + #[error("backing out failed migration also failed: {0} ({1})")] + Restore(backup::Error, sqlx::migrate::MigrateError), + /// Failure due to a database migration error. See + /// [`sqlx::migrate::MigrateError`]. + #[error(transparent)] + Migration(#[from] sqlx::migrate::MigrateError), + /// Failure because the database contains a migration from an unsupported + /// schema version. + #[error("database contains rejected migration {0}:{1}, move it aside")] + Rejected(String, String), +} + +pub trait NotFound { + type Ok; + fn not_found<E, F>(self, map: F) -> Result<Self::Ok, E> + where + E: From<sqlx::Error>, + F: FnOnce() -> E; +} + +impl<T> NotFound for Result<T, sqlx::Error> { + type Ok = T; + + fn not_found<E, F>(self, map: F) -> Result<T, E> + where + E: From<sqlx::Error>, + F: FnOnce() -> E, + { + match self { + Err(sqlx::Error::RowNotFound) => Err(map()), + Err(other) => Err(other.into()), + Ok(value) => Ok(value), + } + } +} diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs index c5efa9b..41f7e13 100644 --- a/src/test/fixtures/mod.rs +++ b/src/test/fixtures/mod.rs @@ -11,7 +11,7 @@ pub mod login; pub mod message; pub async fn scratch_app() -> App { - let pool = db::prepare("sqlite::memory:") + let pool = db::prepare("sqlite::memory:", "sqlite::memory:") .await .expect("setting up in-memory sqlite database"); App::from(pool) |
