summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-25 00:33:16 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-25 00:56:48 -0400
commit5423ec3937a4e28f3958a71b3db7498a4c427dc1 (patch)
tree8fc8531086c1691b0a9fc0a5ddb615d913dc6448
parenteae0edb57e9ade7c73affb78baf2ae267b6290b8 (diff)
Tests for purged channels and messages.
This required a re-think of the `.immediately()` combinator, to generalize it to cases where a message is _not_ expected. That (more or less immediately) suggested some mixed combinators, particularly for stream futures (futures of `Option<T>`).
-rw-r--r--Cargo.lock21
-rw-r--r--Cargo.toml1
-rw-r--r--src/channel/app.rs10
-rw-r--r--src/channel/routes/channel/test/post.rs27
-rw-r--r--src/channel/routes/test.rs11
-rw-r--r--src/event/routes/test/channel.rs69
-rw-r--r--src/event/routes/test/invite.rs12
-rw-r--r--src/event/routes/test/message.rs77
-rw-r--r--src/event/routes/test/resume.rs15
-rw-r--r--src/event/routes/test/setup.rs7
-rw-r--r--src/event/routes/test/token.rs16
-rw-r--r--src/test/fixtures/future.rs240
-rw-r--r--src/ui/mime.rs5
-rw-r--r--src/ui/routes/ch/channel.rs19
14 files changed, 386 insertions, 144 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2b8eb5e..f5ba5ff 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -815,6 +815,7 @@ dependencies = [
"itertools",
"mime",
"password-hash",
+ "pin-project",
"rand",
"rand_core",
"rusqlite",
@@ -1261,6 +1262,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
+name = "pin-project"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index e9c9616..989f8cc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,4 +39,5 @@ uuid = { version = "1.11.0", features = ["v4"] }
[dev-dependencies]
faker_rand = "0.1.1"
+pin-project = "1.1.7"
rand = "0.8.5"
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 7bfa0f7..8359277 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -4,7 +4,7 @@ use sqlx::sqlite::SqlitePool;
use super::{
repo::{LoadError, Provider as _},
- Channel, History, Id,
+ Channel, Id,
};
use crate::{
clock::DateTime,
@@ -42,12 +42,14 @@ impl<'a> Channels<'a> {
// This function is careless with respect to time, and gets you the channel as
// it exists in the specific moment when you call it.
- pub async fn get(&self, channel: &Id) -> Result<Option<Channel>, Error> {
+ pub async fn get(&self, channel: &Id) -> Result<Channel, Error> {
+ let not_found = || Error::NotFound(channel.clone());
+
let mut tx = self.db.begin().await?;
- let channel = tx.channels().by_id(channel).await.optional()?;
+ let channel = tx.channels().by_id(channel).await.not_found(not_found)?;
tx.commit().await?;
- Ok(channel.as_ref().and_then(History::as_snapshot))
+ channel.as_snapshot().ok_or_else(not_found)
}
pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), Error> {
diff --git a/src/channel/routes/channel/test/post.rs b/src/channel/routes/channel/test/post.rs
index d81715f..111a703 100644
--- a/src/channel/routes/channel/test/post.rs
+++ b/src/channel/routes/channel/test/post.rs
@@ -1,11 +1,11 @@
use axum::extract::{Json, Path, State};
-use futures::stream::StreamExt;
+use futures::stream::{self, StreamExt as _};
use crate::{
channel::{self, routes::channel::post},
event::Sequenced,
- message::{self, app::SendError},
- test::fixtures::{self, future::Immediately as _},
+ message::app::SendError,
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,24 +39,23 @@ async fn messages_in_order() {
// Verify the semantics
- let events = app
+ let mut events = app
.events()
.subscribe(None)
.await
.expect("subscribing to a valid channel succeeds")
.filter_map(fixtures::event::message)
- .take(requests.len());
+ .filter_map(fixtures::event::message::sent)
+ .zip(stream::iter(requests));
- let events = events.collect::<Vec<_>>().immediately().await;
-
- for ((sent_at, message), event) in requests.into_iter().zip(events) {
+ while let Some((event, (sent_at, body))) = events
+ .next()
+ .expect_ready("an event should be ready for each message")
+ .await
+ {
assert_eq!(*sent_at, event.at());
- assert!(matches!(
- event,
- message::Event::Sent(event)
- if event.message.sender == sender.login.id
- && event.message.body == message
- ));
+ assert_eq!(sender.login.id, event.message.sender);
+ assert_eq!(body, event.message.body);
}
}
diff --git a/src/channel/routes/test.rs b/src/channel/routes/test.rs
index 46c58b0..10b1e8d 100644
--- a/src/channel/routes/test.rs
+++ b/src/channel/routes/test.rs
@@ -7,7 +7,7 @@ use super::post;
use crate::{
channel::app,
name::Name,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,7 +39,6 @@ async fn new_channel() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
@@ -52,11 +51,7 @@ async fn new_channel() {
.filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == response));
- let event = events
- .next()
- .immediately()
- .await
- .expect("creation event published");
+ let event = events.next().expect_some("creation event published").await;
assert_eq!(event.channel, response);
}
@@ -165,7 +160,6 @@ async fn name_reusable_after_delete() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
}
@@ -215,7 +209,6 @@ async fn name_reusable_after_expiry() {
.channels()
.get(&response.id)
.await
- .expect("searching for channels by ID never fails")
.expect("the newly-created channel exists");
assert_eq!(response, channel);
}
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
index 0ab28c4..6a0a803 100644
--- a/src/event/routes/test/channel.rs
+++ b/src/event/routes/test/channel.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -32,14 +32,13 @@ async fn creating() {
// Verify channel created event
- let _ = events
+ events
.filter_map(fixtures::event::channel)
.filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
- .immediately()
- .await
- .expect("channel created event is delivered");
+ .expect_some("channel created event is delivered")
+ .await;
}
#[tokio::test]
@@ -72,9 +71,8 @@ async fn previously_created() {
.filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
- .immediately()
- .await
- .expect("channel created event is delivered");
+ .expect_some("channel created event is delivered")
+ .await;
}
#[tokio::test]
@@ -105,9 +103,8 @@ async fn expiring() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -138,9 +135,8 @@ async fn previously_expired() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -171,9 +167,8 @@ async fn deleting() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -204,7 +199,43 @@ async fn previously_deleted() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Delete and purge the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
.await
- .expect("a deleted channel event will be delivered");
+ .expect("purging channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_wait("deleted channel events not delivered")
+ .await;
}
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
index afd3aeb..d24f474 100644
--- a/src/event/routes/test/invite.rs
+++ b/src/event/routes/test/invite.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,9 +39,8 @@ async fn accepting_invite() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == joiner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
#[tokio::test]
@@ -76,7 +75,6 @@ async fn previously_accepted_invite() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == joiner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
index df42a89..63a3f43 100644
--- a/src/event/routes/test/message.rs
+++ b/src/event/routes/test/message.rs
@@ -7,7 +7,7 @@ use futures::{
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -46,9 +46,8 @@ async fn sending() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
- .immediately()
- .await
- .expect("delivered message sent event");
+ .expect_some("delivered message sent event")
+ .await;
}
#[tokio::test]
@@ -87,9 +86,8 @@ async fn previously_sent() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
- .immediately()
- .await
- .expect("delivered message sent event");
+ .expect_some("delivered message sent event")
+ .await;
}
#[tokio::test]
@@ -128,7 +126,7 @@ async fn sent_in_multiple_channels() {
.filter_map(fixtures::event::message::sent)
.take(messages.len())
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("events ready")
.await;
for message in &messages {
@@ -167,9 +165,8 @@ async fn sent_sequentially() {
for message in &messages {
let event = events
.next()
- .immediately()
- .await
- .expect("undelivered messages remaining");
+ .expect_some("undelivered messages remaining")
+ .await;
assert_eq!(message, &event.message);
}
@@ -205,9 +202,8 @@ async fn expiring() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -240,9 +236,8 @@ async fn previously_expired() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -275,9 +270,8 @@ async fn deleting() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -310,7 +304,46 @@ async fn previously_deleted() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Purge the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ app.messages()
+ .purge(&fixtures::now())
.await
- .expect("a deleted message event will be delivered");
+ .expect("purge always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_wait("no deleted message will be delivered")
+ .await;
}
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index e4751bb..62b9bad 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -6,7 +6,7 @@ use futures::stream::{self, StreamExt as _};
use crate::{
event::{routes::get, Sequenced as _},
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -44,9 +44,8 @@ async fn resume() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == initial_message))
.next()
- .immediately()
- .await
- .expect("delivered events");
+ .expect_some("delivered event for initial message")
+ .await;
event.sequence()
};
@@ -68,7 +67,7 @@ async fn resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(later_messages));
- while let Some((event, message)) = events.next().immediately().await {
+ while let Some((event, message)) = events.next().expect_ready("event ready").await {
assert_eq!(message, event.message);
}
}
@@ -128,7 +127,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(initial_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
@@ -169,7 +168,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(resume_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
@@ -210,7 +209,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(final_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
index a54b65b..007b03d 100644
--- a/src/event/routes/test/setup.rs
+++ b/src/event/routes/test/setup.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
// There's no test for this in subscribe-then-setup order because creating an
@@ -40,7 +40,6 @@ async fn previously_completed() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == owner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
index 577fabd..2039d9b 100644
--- a/src/event/routes/test/token.rs
+++ b/src/event/routes/test/token.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -40,14 +40,13 @@ async fn terminates_on_token_expiry() {
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
- assert!(events
+ events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
- .immediately()
- .await
- .is_none());
+ .expect_none("end of stream")
+ .await;
}
#[tokio::test]
@@ -86,12 +85,11 @@ async fn terminates_on_logout() {
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
- assert!(events
+ events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
- .immediately()
- .await
- .is_none());
+ .expect_none("end of stream")
+ .await;
}
diff --git a/src/test/fixtures/future.rs b/src/test/fixtures/future.rs
index bbdc9f8..2f810a3 100644
--- a/src/test/fixtures/future.rs
+++ b/src/test/fixtures/future.rs
@@ -1,55 +1,221 @@
-use std::{future::IntoFuture, time::Duration};
+use std::{future::Future, pin::Pin, task};
-use futures::{stream, Stream};
-use tokio::time::timeout;
+use futures::stream;
-async fn immediately<F>(fut: F) -> F::Output
+// Combinators for futures that prevent waits, even when the underlying future
+// would block.
+//
+// These are only useful for futures with no bound on how long they may wait,
+// and this trait is only implemented on futures that are likely to have that
+// characteristic. Trying to apply this to futures that already have some
+// bounded wait time may make tests fail inappropriately and can hide other
+// logic errors.
+pub trait Expect: Sized {
+ // The returned future expects the underlying future to be ready immediately,
+ // and panics with the provided message if it is not.
+ //
+ // For stream operations, can be used to assert immediate completion.
+ fn expect_ready(self, message: &str) -> Ready<Self>
+ where
+ Self: Future;
+
+ // The returned future expects the underlying future _not_ to be ready, and
+ // panics if it is. This is usually a useful proxy for "I expect this to never
+ // arrive" or "to not be here yet." The future is transformed to return `()`,
+ // since the underlying future can never provide a value.
+ //
+ // For stream operations, can be used to assert that completion hasn't happened
+ // yet.
+ fn expect_wait(self, message: &str) -> Wait<Self>
+ where
+ Self: Future;
+
+ // The returned future expects the underlying future to resolve immediately, to
+ // a `Some` value. If it resolves to `None` or is not ready, it panics. The
+ // future is transformed to return the inner value from the `Some` case, like
+ // [`Option::expect`].
+ //
+ // For stream operations, can be used to assert that the stream has at least one
+ // message.
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>;
+
+ // The returned future expects the underlying future to resolve immediately, to
+ // a `None` value. If it resolves to `Some(_)`, or is not ready, it panics. The
+ // future is transformed to return `()`, since the underlying future's value is
+ // fixed.
+ //
+ // For stream operations, can be used to assert that the stream has ended.
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>;
+}
+
+impl<'a, St> Expect for stream::Next<'a, St> {
+ fn expect_ready(self, message: &str) -> Ready<Self> {
+ Ready {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_wait(self, message: &str) -> Wait<Self> {
+ Wait {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ Some {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ None {
+ future: self,
+ message,
+ }
+ }
+}
+
+impl<St, C> Expect for stream::Collect<St, C> {
+ fn expect_ready(self, message: &str) -> Ready<Self> {
+ Ready {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_wait(self, message: &str) -> Wait<Self> {
+ Wait {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_some<T>(self, message: &str) -> Some<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ Some {
+ future: self,
+ message,
+ }
+ }
+
+ fn expect_none<T>(self, message: &str) -> None<Self>
+ where
+ Self: Future<Output = Option<T>>,
+ {
+ None {
+ future: self,
+ message,
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct Ready<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F> Future for Ready<'m, F>
where
- F: IntoFuture,
+ F: Future + std::fmt::Debug,
{
- // I haven't been particularly rigorous here. Zero delay _seems to work_,
- // but this can be set higher; it makes tests that fail to meet the
- // "immediate" expectation take longer, but gives slow tests time to
- // succeed, as well.
- let duration = Duration::from_nanos(0);
- timeout(duration, fut)
- .await
- .expect("expected result immediately")
-}
-
-// This is only intended for streams, since their `next()`, `collect()`, and
-// so on can all block indefinitely on an empty stream. There's no need to
-// force immediacy on futures that "can't" block forever, and it can hide logic
-// errors if you do that.
-//
-// The impls below _could_ be replaced with a blanket impl for all future
-// types, otherwise. The choice to restrict impls to stream futures is
-// deliberate.
-pub trait Immediately {
- type Output;
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
+
+ if let task::Poll::Ready(value) = this.future.poll(cx) {
+ task::Poll::Ready(value)
+ } else {
+ panic!("{}", this.message);
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct Wait<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
- async fn immediately(self) -> Self::Output;
+impl<'m, F> Future for Wait<'m, F>
+where
+ F: Future + std::fmt::Debug,
+{
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
+
+ if this.future.poll(cx).is_pending() {
+ task::Poll::Ready(())
+ } else {
+ panic!("{}", this.message);
+ }
+ }
}
-impl<'a, St> Immediately for stream::Next<'a, St>
+#[pin_project::pin_project]
+pub struct Some<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F, T> Future for Some<'m, F>
where
- St: Stream + Unpin + ?Sized,
+ F: Future<Output = Option<T>> + std::fmt::Debug,
{
- type Output = Option<<St as Stream>::Item>;
+ type Output = T;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
- async fn immediately(self) -> Self::Output {
- immediately(self).await
+ if let task::Poll::Ready(Option::Some(value)) = this.future.poll(cx) {
+ task::Poll::Ready(value)
+ } else {
+ panic!("{}", this.message)
+ }
}
}
-impl<St, C> Immediately for stream::Collect<St, C>
+#[pin_project::pin_project]
+pub struct None<'m, F> {
+ #[pin]
+ future: F,
+ message: &'m str,
+}
+
+impl<'m, F, T> Future for None<'m, F>
where
- St: Stream,
- C: Default + Extend<<St as Stream>::Item>,
+ F: Future<Output = Option<T>> + std::fmt::Debug,
{
- type Output = C;
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
+ let this = self.project();
- async fn immediately(self) -> Self::Output {
- immediately(self).await
+ if let task::Poll::Ready(Option::None) = this.future.poll(cx) {
+ task::Poll::Ready(())
+ } else {
+ panic!("{}", this.message)
+ }
}
}
diff --git a/src/ui/mime.rs b/src/ui/mime.rs
index 9c724f0..7818ac1 100644
--- a/src/ui/mime.rs
+++ b/src/ui/mime.rs
@@ -1,7 +1,10 @@
use mime::Mime;
use unix_path::Path;
-// Extremely manual; using `std::path` here would result in platform-dependent behaviour when it's not appropriate (the URLs passed here always use `/` and are parsed like URLs). Using `unix_path` might be an option, but it's not clearly
+// Extremely manual; using `std::path` here would result in platform-dependent
+// behaviour when it's not appropriate (the URLs passed here always use `/` and
+// are parsed like URLs). Using `unix_path` might be an option, but it's not
+// clearly
pub fn from_path<P>(path: P) -> Result<Mime, mime::FromStrError>
where
P: AsRef<Path>,
diff --git a/src/ui/routes/ch/channel.rs b/src/ui/routes/ch/channel.rs
index a338f1f..a854f14 100644
--- a/src/ui/routes/ch/channel.rs
+++ b/src/ui/routes/ch/channel.rs
@@ -6,7 +6,7 @@ pub mod get {
use crate::{
app::App,
- channel,
+ channel::{self, app},
error::Internal,
token::extract::Identity,
ui::{
@@ -21,18 +21,14 @@ pub mod get {
Path(channel): Path<channel::Id>,
) -> Result<Asset, Error> {
let _ = identity.ok_or(Error::NotLoggedIn)?;
- app.channels()
- .get(&channel)
- .await
- .map_err(Error::internal)?
- .ok_or(Error::NotFound)?;
+ app.channels().get(&channel).await.map_err(Error::from)?;
Assets::index().map_err(Error::Internal)
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
- #[error("requested channel not found")]
+ #[error("channel not found")]
NotFound,
#[error("not logged in")]
NotLoggedIn,
@@ -40,9 +36,12 @@ pub mod get {
Internal(Internal),
}
- impl Error {
- fn internal(err: impl Into<Internal>) -> Self {
- Self::Internal(err.into())
+ impl From<app::Error> for Error {
+ fn from(error: app::Error) -> Self {
+ match error {
+ app::Error::NotFound(_) | app::Error::Deleted(_) => Self::NotFound,
+ other => Self::Internal(other.into()),
+ }
}
}