summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-09-17 19:06:20 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-09-17 19:06:20 -0400
commit921f38a73e5d58a5a6077477a8b52d2705798f55 (patch)
tree895eee6c31a9e6003e4d199e32eb04bcb32f8f11
parente6be82157fe718570aa13ab12803ee39083b8dff (diff)
Express record dependencies through types.
This provides a convenient place to _stick_ "not found" errors, though actually introducing them will come in a later commit.
-rw-r--r--.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json26
-rw-r--r--src/channel/app.rs13
-rw-r--r--src/channel/repo/broadcast.rs29
-rw-r--r--src/index/app.rs2
-rw-r--r--src/index/routes.rs2
-rw-r--r--src/login/app.rs2
-rw-r--r--src/repo/channel.rs2
-rw-r--r--src/repo/login/mod.rs2
-rw-r--r--src/repo/login/store.rs18
-rw-r--r--src/repo/token.rs4
10 files changed, 27 insertions, 73 deletions
diff --git a/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json b/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json
deleted file mode 100644
index aa35e54..0000000
--- a/.sqlx/query-8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n select\n id as \"id: Id\",\n name\n from login\n where id = $1\n ",
- "describe": {
- "columns": [
- {
- "name": "id: Id",
- "ordinal": 0,
- "type_info": "Text"
- },
- {
- "name": "name",
- "ordinal": 1,
- "type_info": "Text"
- }
- ],
- "parameters": {
- "Right": 1
- },
- "nullable": [
- false,
- false
- ]
- },
- "hash": "8e407aa645ceb5a1010aaa469d57a5956342b185d001c94179d4172936554829"
-}
diff --git a/src/channel/app.rs b/src/channel/app.rs
index b0b63b3..29d9c09 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -55,13 +55,14 @@ impl<'a> Channels<'a> {
sent_at: &DateTime,
) -> Result<(), BoxedError> {
let mut tx = self.db.begin().await?;
+ let channel = tx.channels().by_id(channel).await?;
let message = tx
.broadcast()
- .create(&login.id, channel, body, sent_at)
+ .create(login, &channel, body, sent_at)
.await?;
tx.commit().await?;
- self.broadcaster.broadcast(channel, message);
+ self.broadcaster.broadcast(&channel.id, message);
Ok(())
}
@@ -71,6 +72,9 @@ impl<'a> Channels<'a> {
resume_at: Option<&DateTime>,
) -> Result<impl Stream<Item = Result<broadcast::Message, BoxedError>> + 'static, BoxedError>
{
+ let mut tx = self.db.begin().await?;
+ let channel = tx.channels().by_id(channel).await?;
+
fn skip_stale<E>(
resume_at: Option<&DateTime>,
) -> impl for<'m> FnMut(&'m broadcast::Message) -> future::Ready<Result<bool, E>> {
@@ -85,12 +89,11 @@ impl<'a> Channels<'a> {
let live_messages = self
.broadcaster
- .listen(channel)
+ .listen(&channel.id)
.map_err(BoxedError::from)
.try_skip_while(skip_stale(resume_at));
- let mut tx = self.db.begin().await?;
- let stored_messages = tx.broadcast().replay(channel, resume_at).await?;
+ let stored_messages = tx.broadcast().replay(&channel, resume_at).await?;
tx.commit().await?;
let stored_messages = stream::iter(stored_messages).map(Ok);
diff --git a/src/channel/repo/broadcast.rs b/src/channel/repo/broadcast.rs
index 3ca7396..ff16cd0 100644
--- a/src/channel/repo/broadcast.rs
+++ b/src/channel/repo/broadcast.rs
@@ -3,8 +3,8 @@ use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
use crate::{
clock::DateTime,
repo::{
- channel,
- login::{self, Login, Logins},
+ channel::Channel,
+ login::{self, Login},
message,
},
};
@@ -32,15 +32,13 @@ pub struct Message {
impl<'c> Broadcast<'c> {
pub async fn create(
&mut self,
- sender: &login::Id,
- channel: &channel::Id,
+ sender: &Login,
+ channel: &Channel,
body: &str,
sent_at: &DateTime,
) -> Result<Message, sqlx::Error> {
let id = message::Id::generate();
- let sender = Logins::from(&mut *self.0).by_id(sender).await?;
-
let message = sqlx::query!(
r#"
insert into message
@@ -54,18 +52,15 @@ impl<'c> Broadcast<'c> {
"#,
id,
sender.id,
- channel,
+ channel.id,
body,
sent_at,
)
- .map(|row| {
- debug_assert!(row.sender == sender.id);
- Message {
- id: row.id,
- sender: sender.clone(),
- body: row.body,
- sent_at: row.sent_at,
- }
+ .map(|row| Message {
+ id: row.id,
+ sender: sender.clone(),
+ body: row.body,
+ sent_at: row.sent_at,
})
.fetch_one(&mut *self.0)
.await?;
@@ -75,7 +70,7 @@ impl<'c> Broadcast<'c> {
pub async fn replay(
&mut self,
- channel: &channel::Id,
+ channel: &Channel,
resume_at: Option<&DateTime>,
) -> Result<Vec<Message>, sqlx::Error> {
let messages = sqlx::query!(
@@ -92,7 +87,7 @@ impl<'c> Broadcast<'c> {
and coalesce(sent_at > $2, true)
order by sent_at asc
"#,
- channel,
+ channel.id,
resume_at,
)
.map(|row| Message {
diff --git a/src/index/app.rs b/src/index/app.rs
index 41b12fa..a4ef57f 100644
--- a/src/index/app.rs
+++ b/src/index/app.rs
@@ -14,7 +14,7 @@ impl<'a> Index<'a> {
Self { db }
}
- pub async fn channel(&self, channel: channel::Id) -> Result<Channel, BoxedError> {
+ pub async fn channel(&self, channel: &channel::Id) -> Result<Channel, BoxedError> {
let mut tx = self.db.begin().await?;
let channel = tx.channels().by_id(channel).await?;
tx.commit().await?;
diff --git a/src/index/routes.rs b/src/index/routes.rs
index ef46cc1..32c7f12 100644
--- a/src/index/routes.rs
+++ b/src/index/routes.rs
@@ -50,7 +50,7 @@ async fn channel(
_: Login,
Path(channel): Path<channel::Id>,
) -> Result<Markup, InternalError> {
- let channel = app.index().channel(channel).await?;
+ let channel = app.index().channel(&channel).await?;
Ok(templates::channel(&channel))
}
diff --git a/src/login/app.rs b/src/login/app.rs
index c82da1a..aec072c 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -46,7 +46,7 @@ impl<'a> Logins<'a> {
// authenticating an existing one succeeded, and we must reject the
// login attempt.
let token = if let Some(login) = login {
- Some(tx.tokens().issue(&login.id, login_at).await?)
+ Some(tx.tokens().issue(&login, login_at).await?)
} else {
None
};
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index ab7489c..8e3a471 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -43,7 +43,7 @@ impl<'c> Channels<'c> {
Ok(channel)
}
- pub async fn by_id(&mut self, channel: Id) -> Result<Channel, sqlx::Error> {
+ pub async fn by_id(&mut self, channel: &Id) -> Result<Channel, sqlx::Error> {
let channel = sqlx::query_as!(
Channel,
r#"
diff --git a/src/repo/login/mod.rs b/src/repo/login/mod.rs
index e23a7b7..a1b4c6f 100644
--- a/src/repo/login/mod.rs
+++ b/src/repo/login/mod.rs
@@ -1,4 +1,4 @@
mod extract;
mod store;
-pub use self::store::{Id, Login, Logins, Provider};
+pub use self::store::{Id, Login, Provider};
diff --git a/src/repo/login/store.rs b/src/repo/login/store.rs
index 24dd744..d979579 100644
--- a/src/repo/login/store.rs
+++ b/src/repo/login/store.rs
@@ -54,24 +54,6 @@ impl<'c> Logins<'c> {
Ok(login)
}
-
- pub async fn by_id(&mut self, id: &Id) -> Result<Login, sqlx::Error> {
- let login = sqlx::query_as!(
- Login,
- r#"
- select
- id as "id: Id",
- name
- from login
- where id = $1
- "#,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- Ok(login)
- }
}
impl<'t> From<&'t mut SqliteConnection> for Logins<'t> {
diff --git a/src/repo/token.rs b/src/repo/token.rs
index e7eb273..01a982e 100644
--- a/src/repo/token.rs
+++ b/src/repo/token.rs
@@ -22,7 +22,7 @@ impl<'c> Tokens<'c> {
/// be used to control expiry, until the token is actually used.
pub async fn issue(
&mut self,
- login: &login::Id,
+ login: &Login,
issued_at: DateTime,
) -> Result<String, sqlx::Error> {
let secret = Uuid::new_v4().to_string();
@@ -35,7 +35,7 @@ impl<'c> Tokens<'c> {
returning secret as "secret!"
"#,
secret,
- login,
+ login.id,
issued_at,
)
.fetch_one(&mut *self.0)