1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
mod backup;
use std::str::FromStr;
use hex_literal::hex;
use sqlx::{
migrate::{Migrate as _, MigrateDatabase as _},
sqlite::{Sqlite, SqliteConnectOptions, SqlitePool, SqlitePoolOptions},
};
pub async fn prepare(url: &str, backup_url: &str) -> Result<SqlitePool, Error> {
if backup_url != "sqlite::memory:" && Sqlite::database_exists(backup_url).await? {
return Err(Error::BackupExists(backup_url.into()));
}
let pool = create(url).await?;
// First migration of original migration series, from commit
// 9bd6d9862b1c243def02200bca2cfbf578ad2a2f or earlier.
reject_migration(&pool, "20240831024047", "login", &hex!("9949D238C4099295EC4BEE734BFDA8D87513B2973DFB895352A11AB01DD46CB95314B7F1B3431B77E3444A165FE3DC28")).await?;
let backup_pool = create(backup_url).await?;
backup::Backup::from(&pool)
.to(&backup_pool)
.backup()
.await?;
if let Err(migrate_error) = sqlx::migrate!().run(&pool).await {
if let Err(restore_error) = backup::Backup::from(&backup_pool).to(&pool).backup().await {
Err(Error::Restore(restore_error, migrate_error))?;
} else if let Err(drop_error) = Sqlite::drop_database(backup_url).await {
Err(Error::Drop(drop_error, migrate_error))?;
} else {
Err(migrate_error)?;
};
}
Sqlite::drop_database(backup_url).await?;
Ok(pool)
}
async fn create(database_url: &str) -> sqlx::Result<SqlitePool> {
let options = SqliteConnectOptions::from_str(database_url)?
.create_if_missing(true)
.optimize_on_close(true, /* analysis_limit */ None);
let pool = SqlitePoolOptions::new().connect_with(options).await?;
Ok(pool)
}
async fn reject_migration(
pool: &SqlitePool,
version: &str,
description: &str,
checksum: &[u8],
) -> Result<(), Error> {
let mut conn = pool.acquire().await?;
conn.ensure_migrations_table().await?;
let applied = conn.list_applied_migrations().await?;
for migration in applied {
if migration.checksum == checksum {
return Err(Error::Rejected(version.into(), description.into()));
}
}
Ok(())
}
/// Errors occurring during database setup.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failure due to a database error. See [`sqlx::Error`].
#[error(transparent)]
Database(#[from] sqlx::Error),
/// Failure because an existing database backup already exists.
#[error("backup from a previous failed migration already exists: {0}")]
BackupExists(String),
/// Failure due to a database backup error. See [`backup::Error`].
#[error(transparent)]
Backup(#[from] backup::Error),
#[error("migration failed: {1}\nrestoring backup failed: {0}")]
Restore(backup::Error, sqlx::migrate::MigrateError),
#[error(
"migration failed: {1}\nrestoring from backup succeeded, but deleting backup failed: {0}"
)]
Drop(sqlx::Error, sqlx::migrate::MigrateError),
/// Failure due to a database migration error. See
/// [`sqlx::migrate::MigrateError`].
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
/// Failure because the database contains a migration from an unsupported
/// schema version.
#[error("database contains rejected migration {0}:{1}, move it aside")]
Rejected(String, String),
}
pub trait NotFound {
type Ok;
fn not_found<E, F>(self, map: F) -> Result<Self::Ok, E>
where
E: From<sqlx::Error>,
F: FnOnce() -> E;
}
impl<T> NotFound for Result<T, sqlx::Error> {
type Ok = T;
fn not_found<E, F>(self, map: F) -> Result<T, E>
where
E: From<sqlx::Error>,
F: FnOnce() -> E,
{
match self {
Err(sqlx::Error::RowNotFound) => Err(map()),
Err(other) => Err(other.into()),
Ok(value) => Ok(value),
}
}
}
|