1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
mod backup;
use std::str::FromStr;
use sqlx::{
migrate::MigrateDatabase as _,
sqlite::{Sqlite, SqliteConnectOptions, SqlitePool, SqlitePoolOptions},
};
pub async fn prepare(url: &str, backup_url: &str) -> Result<SqlitePool, Error> {
if backup_url != "sqlite::memory:" && Sqlite::database_exists(backup_url).await? {
return Err(Error::BackupExists(backup_url.into()));
}
let pool = create(url).await?;
// First migration of original migration series, from commit
// 9bd6d9862b1c243def02200bca2cfbf578ad2a2f or earlier.
reject_migration(&pool, "20240831024047", "login", "9949D238C4099295EC4BEE734BFDA8D87513B2973DFB895352A11AB01DD46CB95314B7F1B3431B77E3444A165FE3DC28").await?;
let backup_pool = create(backup_url).await?;
backup::Backup::from(&pool)
.await?
.to(&backup_pool)
.await?
.backup()
.await?;
if let Err(migrate_error) = sqlx::migrate!().run(&pool).await {
if let Err(restore_error) = backup::Backup::from(&backup_pool)
.await?
.to(&pool)
.await?
.backup()
.await
{
Err(Error::Restore(restore_error, migrate_error))?;
} else {
Err(migrate_error)?;
};
}
Sqlite::drop_database(backup_url).await?;
Ok(pool)
}
async fn create(database_url: &str) -> sqlx::Result<SqlitePool> {
let options = SqliteConnectOptions::from_str(database_url)?
.create_if_missing(true)
.optimize_on_close(true, /* analysis_limit */ None);
let pool = SqlitePoolOptions::new().connect_with(options).await?;
Ok(pool)
}
async fn reject_migration(
pool: &SqlitePool,
version: &str,
description: &str,
checksum_hex: &str,
) -> Result<(), Error> {
if !sqlx::query_scalar!(
r#"
select count(*) as "exists: bool"
from sqlite_master
where name = '_sqlx_migrations'
"#
)
.fetch_one(pool)
.await?
{
// No migrations table; this is a fresh DB.
return Ok(());
}
if !sqlx::query_scalar!(
r#"
select count(*) as "exists: bool"
from _sqlx_migrations
where version = $1
and description = $2
and hex(checksum) = $3
"#,
version,
description,
checksum_hex,
)
.fetch_one(pool)
.await?
{
// Rejected migration does not exist; this DB never ran it.
return Ok(());
}
Err(Error::Rejected(version.into(), description.into()))
}
/// Errors occurring during database setup.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failure due to a database error. See [`sqlx::Error`].
#[error(transparent)]
Database(#[from] sqlx::Error),
/// Failure because an existing database backup already exists.
#[error("backup from a previous failed migration already exists: {0}")]
BackupExists(String),
/// Failure due to a database backup error. See [`backup::Error`].
#[error(transparent)]
Backup(#[from] backup::Error),
#[error("backing out failed migration also failed: {0} ({1})")]
Restore(backup::Error, sqlx::migrate::MigrateError),
/// Failure due to a database migration error. See
/// [`sqlx::migrate::MigrateError`].
#[error(transparent)]
Migration(#[from] sqlx::migrate::MigrateError),
/// Failure because the database contains a migration from an unsupported
/// schema version.
#[error("database contains rejected migration {0}:{1}, move it aside")]
Rejected(String, String),
}
pub trait NotFound {
type Ok;
fn not_found<E, F>(self, map: F) -> Result<Self::Ok, E>
where
E: From<sqlx::Error>,
F: FnOnce() -> E;
}
impl<T> NotFound for Result<T, sqlx::Error> {
type Ok = T;
fn not_found<E, F>(self, map: F) -> Result<T, E>
where
E: From<sqlx::Error>,
F: FnOnce() -> E,
{
match self {
Err(sqlx::Error::RowNotFound) => Err(map()),
Err(other) => Err(other.into()),
Ok(value) => Ok(value),
}
}
}
|