summaryrefslogtreecommitdiff
path: root/src/events/routes/test.rs
diff options
context:
space:
mode:
authorKit La Touche <kit@transneptune.net>2024-09-28 21:55:50 -0400
committerKit La Touche <kit@transneptune.net>2024-09-28 21:55:50 -0400
commit897eef0306917baf3662e691b29f182d35805296 (patch)
tree024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/events/routes/test.rs
parentc524b333befc8cc97aa49f73b3ed28bc3b82420c (diff)
parent4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff)
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/events/routes/test.rs')
-rw-r--r--src/events/routes/test.rs332
1 files changed, 76 insertions, 256 deletions
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 0b08fd6..a6e2275 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -1,70 +1,40 @@
use axum::extract::State;
-use axum_extra::extract::Query;
use futures::{
future,
stream::{self, StreamExt as _},
};
use crate::{
- events::{app, routes},
- repo::channel::{self},
+ events::{routes, types},
test::fixtures::{self, future::Immediately as _},
};
#[tokio::test]
-async fn no_subscriptions() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let subscriber = fixtures::login::create(&app).await;
-
- // Call the endpoint
-
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("empty subscription");
-
- // Verify the structure of the response.
-
- assert!(events.next().immediately().await.is_none());
-}
-
-#[tokio::test]
async fn includes_historical_message() {
// Set up the environment
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channel");
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
.expect("delivered stored message");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
+ assert_eq!(message, event);
}
#[tokio::test]
@@ -72,74 +42,28 @@ async fn includes_live_message() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber,
- None,
- Query(query),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
let sender = fixtures::login::create(&app).await;
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
.expect("delivered live message");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
-}
-
-#[tokio::test]
-async fn excludes_other_channels() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let subscribed_channel = fixtures::channel::create(&app).await;
- let unsubscribed_channel = fixtures::channel::create(&app).await;
- let sender = fixtures::login::create(&app).await;
- let message =
- fixtures::message::send(&app, &sender, &subscribed_channel, &fixtures::now()).await;
- fixtures::message::send(&app, &sender, &unsubscribed_channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [subscribed_channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to a valid channel");
-
- // Verify the semantics
-
- let routes::ReplayableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered at least one message");
-
- assert_eq!(subscribed_channel.id, event.channel);
- assert_eq!(message, event.message);
+ assert_eq!(message, event);
}
#[tokio::test]
@@ -150,15 +74,16 @@ async fn includes_multiple_channels() {
let sender = fixtures::login::create(&app).await;
let channels = [
- fixtures::channel::create(&app).await,
- fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
];
let messages = stream::iter(channels)
- .then(|channel| async {
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- (channel, message)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await }
})
.collect::<Vec<_>>()
.await;
@@ -166,68 +91,32 @@ async fn includes_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: messages
- .iter()
- .map(|(channel, _)| &channel.id)
- .cloned()
- .collect(),
- };
- let routes::Events(events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channels");
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
let events = events
+ .filter(fixtures::filter::messages())
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
.await;
- for (channel, message) in messages {
- assert!(events.iter().any(|routes::ReplayableEvent(_, event)| {
- event.channel == channel.id && event.message == message
- }));
+ for message in &messages {
+ assert!(events
+ .iter()
+ .any(|types::ResumableEvent(_, event)| { event == message }));
}
}
#[tokio::test]
-async fn nonexistent_channel() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let channel = channel::Id::generate();
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.clone()].into(),
- };
- let routes::ErrorResponse(error) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect_err("subscribed to nonexistent channel");
-
- // Verify the structure of the response.
-
- fixtures::error::expected!(
- error,
- app::EventsError::ChannelNotFound(error_channel),
- assert_eq!(channel, error_channel)
- );
-}
-
-#[tokio::test]
async fn sequential_messages() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
let messages = vec![
@@ -239,31 +128,24 @@ async fn sequential_messages() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
- let mut events = events.filter(|routes::ReplayableEvent(_, event)| {
- future::ready(messages.contains(&event.message))
- });
+ let mut events =
+ events.filter(|types::ResumableEvent(_, event)| future::ready(messages.contains(event)));
// Verify delivery in order
for message in &messages {
- let routes::ReplayableEvent(_, event) = events
+ let types::ResumableEvent(_, event) = events
.next()
.immediately()
.await
.expect("undelivered messages remaining");
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, &event.message);
+ assert_eq!(message, &event);
}
}
@@ -272,7 +154,7 @@ async fn resumes_from() {
// Set up the environment
let app = fixtures::scratch_app().await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app).await;
let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
@@ -285,43 +167,29 @@ async fn resumes_from() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
let resume_at = {
// First subscription
- let routes::Events(mut events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber.clone(),
- None,
- Query(query.clone()),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let routes::ReplayableEvent(id, event) =
- events.next().immediately().await.expect("delivered events");
+ let types::ResumableEvent(last_event_id, event) = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
- assert_eq!(channel.id, event.channel);
- assert_eq!(initial_message, event.message);
+ assert_eq!(initial_message, event);
- id
+ last_event_id
};
// Resume after disconnect
- let reconnect_at = fixtures::now();
- let routes::Events(resumed) = routes::events(
- State(app),
- reconnect_at,
- subscriber,
- Some(resume_at.into()),
- Query(query),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -331,11 +199,10 @@ async fn resumes_from() {
.immediately()
.await;
- for message in later_messages {
- assert!(events.iter().any(
- |routes::ReplayableEvent(_, event)| event.channel == channel.id
- && event.message == message
- ));
+ for message in &later_messages {
+ assert!(events
+ .iter()
+ .any(|types::ResumableEvent(_, event)| event == message));
}
}
@@ -360,15 +227,12 @@ async fn serial_resume() {
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel_a = fixtures::channel::create(&app).await;
- let channel_b = fixtures::channel::create(&app).await;
+ let channel_a = fixtures::channel::create(&app, &fixtures::now()).await;
+ let channel_b = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let query = routes::EventsQuery {
- channels: [channel_a.id.clone(), channel_b.id.clone()].into(),
- };
let resume_at = {
let initial_messages = [
@@ -377,30 +241,24 @@ async fn serial_resume() {
];
// First subscription
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(
- State(app.clone()),
- subscribed_at,
- subscriber.clone(),
- None,
- Query(query.clone()),
- )
- .await
- .expect("subscribed to a valid channel");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
.await;
- for message in initial_messages {
+ for message in &initial_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
- let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+ let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty");
id.to_owned()
};
@@ -416,30 +274,28 @@ async fn serial_resume() {
];
// Second subscription
- let resubscribed_at = fixtures::now();
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
- Query(query.clone()),
)
.await
- .expect("subscribed to a valid channel");
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
.await;
- for message in resume_messages {
+ for message in &resume_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
- let routes::ReplayableEvent(id, _) = events.last().expect("this vec is non-empty");
+ let types::ResumableEvent(id, _) = events.last().expect("this vec is non-empty");
id.to_owned()
};
@@ -454,19 +310,17 @@ async fn serial_resume() {
fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
];
- // Second subscription
- let resubscribed_at = fixtures::now();
+ // Third subscription
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
- Query(query.clone()),
)
.await
- .expect("subscribed to a valid channel");
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -474,44 +328,10 @@ async fn serial_resume() {
// This set of messages, in particular, _should not_ include any prior
// messages from `initial_messages` or `resume_messages`.
- for message in final_messages {
+ for message in &final_messages {
assert!(events
.iter()
- .any(|routes::ReplayableEvent(_, event)| event.message == message));
+ .any(|types::ResumableEvent(_, event)| event == message));
}
};
}
-
-#[tokio::test]
-async fn removes_expired_messages() {
- // Set up the environment
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
-
- fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let query = routes::EventsQuery {
- channels: [channel.id.clone()].into(),
- };
- let routes::Events(mut events) =
- routes::events(State(app), subscribed_at, subscriber, None, Query(query))
- .await
- .expect("subscribed to valid channel");
-
- // Verify the semantics
-
- let routes::ReplayableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered messages");
-
- assert_eq!(channel.id, event.channel);
- assert_eq!(message, event.message);
-}