summaryrefslogtreecommitdiff
path: root/src/event/routes/test/message.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/routes/test/message.rs')
-rw-r--r--src/event/routes/test/message.rs316
1 files changed, 316 insertions, 0 deletions
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
new file mode 100644
index 0000000..9bbbc7d
--- /dev/null
+++ b/src/event/routes/test/message.rs
@@ -0,0 +1,316 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ event::routes::get,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn sending() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered message sent event");
+}
+
+#[tokio::test]
+async fn previously_sent() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+
+ // Send a message
+
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = app
+ .messages()
+ .send(
+ &channel.id,
+ &sender,
+ &fixtures::now(),
+ &fixtures::message::propose(),
+ )
+ .await
+ .expect("sending a message succeeds");
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify that an event is delivered
+
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(event.message == message))
+ .next()
+ .immediately()
+ .await
+ .expect("delivered message sent event");
+}
+
+#[tokio::test]
+async fn sent_in_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let channels = [
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| {
+ let app = app.clone();
+ let sender = sender.clone();
+ let channel = channel.clone();
+ async move { fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await }
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in &messages {
+ assert!(events.iter().any(|event| &event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn sent_sequentially() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
+ ];
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) = get::handler(State(app), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Verify the expected events in the expected order
+
+ let mut events = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::sent)
+ .filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
+
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn expiring() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_expired() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Expire messages
+
+ app.messages()
+ .expire(&fixtures::now())
+ .await
+ .expect("expiring messages always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn deleting() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}
+
+#[tokio::test]
+async fn previously_deleted() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
+ let sender = fixtures::login::create(&app, &fixtures::now()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await;
+
+ // Delete the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::now())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+ let _ = events
+ .filter_map(fixtures::message::events)
+ .filter_map(fixtures::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .immediately()
+ .await
+ .expect("a deleted message event will be delivered");
+}