summaryrefslogtreecommitdiff
path: root/src/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/event')
-rw-r--r--src/event/app.rs9
-rw-r--r--src/event/extract.rs4
-rw-r--r--src/event/repo.rs6
-rw-r--r--src/event/routes/get.rs5
-rw-r--r--src/event/routes/mod.rs2
-rw-r--r--src/event/routes/test/resume.rs26
6 files changed, 28 insertions, 24 deletions
diff --git a/src/event/app.rs b/src/event/app.rs
index b309245..8661c90 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -1,12 +1,11 @@
use futures::{
- future,
+ Stream, future,
stream::{self, StreamExt as _},
- Stream,
};
use itertools::Itertools as _;
use sqlx::sqlite::SqlitePool;
-use super::{broadcaster::Broadcaster, Event, Sequence, Sequenced};
+use super::{Event, Sequence, Sequenced, broadcaster::Broadcaster};
use crate::{
channel::{self, repo::Provider as _},
login::{self, repo::Provider as _},
@@ -27,7 +26,7 @@ impl<'a> Events<'a> {
pub async fn subscribe(
&self,
resume_at: Sequence,
- ) -> Result<impl Stream<Item = Event> + std::fmt::Debug, Error> {
+ ) -> Result<impl Stream<Item = Event> + std::fmt::Debug + use<>, Error> {
// Subscribe before retrieving, to catch messages broadcast while we're
// querying the DB. We'll prune out duplicates later.
let live_messages = self.events.subscribe();
@@ -76,7 +75,7 @@ impl<'a> Events<'a> {
Ok(replay.chain(live_messages))
}
- fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> {
+ fn resume(resume_at: Sequence) -> impl for<'m> FnMut(&'m Event) -> future::Ready<bool> + use<> {
let filter = Sequence::after(resume_at);
move |event| future::ready(filter(event))
}
diff --git a/src/event/extract.rs b/src/event/extract.rs
index 4a35937..8fde1d5 100644
--- a/src/event/extract.rs
+++ b/src/event/extract.rs
@@ -2,10 +2,10 @@ use std::ops::Deref;
use axum::{
extract::{FromRequestParts, OptionalFromRequestParts},
- http::{request::Parts, HeaderName, HeaderValue},
+ http::{HeaderName, HeaderValue, request::Parts},
};
use axum_extra::typed_header::TypedHeader;
-use serde::{de::DeserializeOwned, Serialize};
+use serde::{Serialize, de::DeserializeOwned};
// A typed header. When used as a bare extractor, reads from the
// `Last-Event-Id` HTTP header.
diff --git a/src/event/repo.rs b/src/event/repo.rs
index 56beeea..ab3c449 100644
--- a/src/event/repo.rs
+++ b/src/event/repo.rs
@@ -1,4 +1,4 @@
-use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
+use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
use crate::{
clock::DateTime,
@@ -9,7 +9,7 @@ pub trait Provider {
fn sequence(&mut self) -> Sequences;
}
-impl<'c> Provider for Transaction<'c, Sqlite> {
+impl Provider for Transaction<'_, Sqlite> {
fn sequence(&mut self) -> Sequences {
Sequences(self)
}
@@ -17,7 +17,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Sequences<'t>(&'t mut SqliteConnection);
-impl<'c> Sequences<'c> {
+impl Sequences<'_> {
pub async fn next(&mut self, at: &DateTime) -> Result<Instant, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
diff --git a/src/event/routes/get.rs b/src/event/routes/get.rs
index ceebcc9..2ca8991 100644
--- a/src/event/routes/get.rs
+++ b/src/event/routes/get.rs
@@ -1,9 +1,8 @@
use axum::{
extract::State,
response::{
- self,
+ self, IntoResponse,
sse::{self, Sse},
- IntoResponse,
},
};
use axum_extra::extract::Query;
@@ -12,7 +11,7 @@ use futures::stream::{Stream, StreamExt as _};
use crate::{
app::App,
error::{Internal, Unauthorized},
- event::{app, extract::LastEventId, Event, Sequence, Sequenced as _},
+ event::{Event, Sequence, Sequenced as _, app, extract::LastEventId},
token::{app::ValidateError, extract::Identity},
};
diff --git a/src/event/routes/mod.rs b/src/event/routes/mod.rs
index 57ab9db..742d397 100644
--- a/src/event/routes/mod.rs
+++ b/src/event/routes/mod.rs
@@ -1,4 +1,4 @@
-use axum::{routing::get, Router};
+use axum::{Router, routing::get};
use crate::app::App;
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index fabda0c..dc27691 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -5,7 +5,7 @@ use axum_extra::extract::Query;
use futures::stream::{self, StreamExt as _};
use crate::{
- event::{routes::get, Sequenced as _},
+ event::{Sequenced as _, routes::get},
test::fixtures::{self, future::Expect as _},
};
@@ -132,9 +132,11 @@ async fn serial_resume() {
.expect_ready("zipping a finite list of events is ready immediately")
.await;
- assert!(events
- .iter()
- .all(|(event, message)| message == &event.message));
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
let (event, _) = events.last().expect("this vec is non-empty");
@@ -173,9 +175,11 @@ async fn serial_resume() {
.expect_ready("zipping a finite list of events is ready immediately")
.await;
- assert!(events
- .iter()
- .all(|(event, message)| message == &event.message));
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
let (event, _) = events.last().expect("this vec is non-empty");
@@ -214,8 +218,10 @@ async fn serial_resume() {
.expect_ready("zipping a finite list of events is ready immediately")
.await;
- assert!(events
- .iter()
- .all(|(event, message)| message == &event.message));
+ assert!(
+ events
+ .iter()
+ .all(|(event, message)| message == &event.message)
+ );
};
}