summaryrefslogtreecommitdiff
path: root/src/event/routes
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/routes')
-rw-r--r--src/event/routes/test/channel.rs69
-rw-r--r--src/event/routes/test/invite.rs12
-rw-r--r--src/event/routes/test/message.rs77
-rw-r--r--src/event/routes/test/resume.rs15
-rw-r--r--src/event/routes/test/setup.rs7
-rw-r--r--src/event/routes/test/token.rs16
6 files changed, 127 insertions, 69 deletions
diff --git a/src/event/routes/test/channel.rs b/src/event/routes/test/channel.rs
index 0ab28c4..6a0a803 100644
--- a/src/event/routes/test/channel.rs
+++ b/src/event/routes/test/channel.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -32,14 +32,13 @@ async fn creating() {
// Verify channel created event
- let _ = events
+ events
.filter_map(fixtures::event::channel)
.filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
- .immediately()
- .await
- .expect("channel created event is delivered");
+ .expect_some("channel created event is delivered")
+ .await;
}
#[tokio::test]
@@ -72,9 +71,8 @@ async fn previously_created() {
.filter_map(fixtures::event::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
- .immediately()
- .await
- .expect("channel created event is delivered");
+ .expect_some("channel created event is delivered")
+ .await;
}
#[tokio::test]
@@ -105,9 +103,8 @@ async fn expiring() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -138,9 +135,8 @@ async fn previously_expired() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -171,9 +167,8 @@ async fn deleting() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
- .await
- .expect("a deleted channel event will be delivered");
+ .expect_some("a deleted channel event will be delivered")
+ .await;
}
#[tokio::test]
@@ -204,7 +199,43 @@ async fn previously_deleted() {
.filter_map(fixtures::event::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
- .immediately()
+ .expect_some("a deleted channel event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+
+ // Delete and purge the channel
+
+ app.channels()
+ .delete(&channel.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid channel succeeds");
+
+ app.channels()
+ .purge(&fixtures::now())
.await
- .expect("a deleted channel event will be delivered");
+ .expect("purging channels always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for expiry event
+ events
+ .filter_map(fixtures::event::channel)
+ .filter_map(fixtures::event::channel::deleted)
+ .filter(|event| future::ready(event.id == channel.id))
+ .next()
+ .expect_wait("deleted channel events not delivered")
+ .await;
}
diff --git a/src/event/routes/test/invite.rs b/src/event/routes/test/invite.rs
index afd3aeb..d24f474 100644
--- a/src/event/routes/test/invite.rs
+++ b/src/event/routes/test/invite.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -39,9 +39,8 @@ async fn accepting_invite() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == joiner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
#[tokio::test]
@@ -76,7 +75,6 @@ async fn previously_accepted_invite() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == joiner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
diff --git a/src/event/routes/test/message.rs b/src/event/routes/test/message.rs
index df42a89..63a3f43 100644
--- a/src/event/routes/test/message.rs
+++ b/src/event/routes/test/message.rs
@@ -7,7 +7,7 @@ use futures::{
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -46,9 +46,8 @@ async fn sending() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
- .immediately()
- .await
- .expect("delivered message sent event");
+ .expect_some("delivered message sent event")
+ .await;
}
#[tokio::test]
@@ -87,9 +86,8 @@ async fn previously_sent() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
- .immediately()
- .await
- .expect("delivered message sent event");
+ .expect_some("delivered message sent event")
+ .await;
}
#[tokio::test]
@@ -128,7 +126,7 @@ async fn sent_in_multiple_channels() {
.filter_map(fixtures::event::message::sent)
.take(messages.len())
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("events ready")
.await;
for message in &messages {
@@ -167,9 +165,8 @@ async fn sent_sequentially() {
for message in &messages {
let event = events
.next()
- .immediately()
- .await
- .expect("undelivered messages remaining");
+ .expect_some("undelivered messages remaining")
+ .await;
assert_eq!(message, &event.message);
}
@@ -205,9 +202,8 @@ async fn expiring() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -240,9 +236,8 @@ async fn previously_expired() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -275,9 +270,8 @@ async fn deleting() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
- .await
- .expect("a deleted message event will be delivered");
+ .expect_some("a deleted message event will be delivered")
+ .await;
}
#[tokio::test]
@@ -310,7 +304,46 @@ async fn previously_deleted() {
.filter_map(fixtures::event::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
- .immediately()
+ .expect_some("a deleted message event will be delivered")
+ .await;
+}
+
+#[tokio::test]
+async fn previously_purged() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app, &fixtures::ancient()).await;
+ let sender = fixtures::login::create(&app, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &channel, &sender, &fixtures::ancient()).await;
+
+ // Purge the message
+
+ app.messages()
+ .delete(&message.id, &fixtures::ancient())
+ .await
+ .expect("deleting a valid message succeeds");
+
+ app.messages()
+ .purge(&fixtures::now())
.await
- .expect("a deleted message event will be delivered");
+ .expect("purge always succeeds");
+
+ // Subscribe
+
+ let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
+ let get::Response(events) =
+ get::handler(State(app.clone()), subscriber, None, Query::default())
+ .await
+ .expect("subscribe never fails");
+
+ // Check for delete event
+
+ events
+ .filter_map(fixtures::event::message)
+ .filter_map(fixtures::event::message::deleted)
+ .filter(|event| future::ready(event.id == message.id))
+ .next()
+ .expect_wait("no deleted message will be delivered")
+ .await;
}
diff --git a/src/event/routes/test/resume.rs b/src/event/routes/test/resume.rs
index e4751bb..62b9bad 100644
--- a/src/event/routes/test/resume.rs
+++ b/src/event/routes/test/resume.rs
@@ -6,7 +6,7 @@ use futures::stream::{self, StreamExt as _};
use crate::{
event::{routes::get, Sequenced as _},
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -44,9 +44,8 @@ async fn resume() {
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(event.message == initial_message))
.next()
- .immediately()
- .await
- .expect("delivered events");
+ .expect_some("delivered event for initial message")
+ .await;
event.sequence()
};
@@ -68,7 +67,7 @@ async fn resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(later_messages));
- while let Some((event, message)) = events.next().immediately().await {
+ while let Some((event, message)) = events.next().expect_ready("event ready").await {
assert_eq!(message, event.message);
}
}
@@ -128,7 +127,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(initial_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
@@ -169,7 +168,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(resume_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
@@ -210,7 +209,7 @@ async fn serial_resume() {
.filter_map(fixtures::event::message::sent)
.zip(stream::iter(final_messages))
.collect::<Vec<_>>()
- .immediately()
+ .expect_ready("zipping a finite list of events is ready immediately")
.await;
assert!(events
diff --git a/src/event/routes/test/setup.rs b/src/event/routes/test/setup.rs
index a54b65b..007b03d 100644
--- a/src/event/routes/test/setup.rs
+++ b/src/event/routes/test/setup.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
// There's no test for this in subscribe-then-setup order because creating an
@@ -40,7 +40,6 @@ async fn previously_completed() {
.filter_map(fixtures::event::login::created)
.filter(|event| future::ready(event.login == owner))
.next()
- .immediately()
- .await
- .expect("a login created event is sent");
+ .expect_some("a login created event is sent")
+ .await;
}
diff --git a/src/event/routes/test/token.rs b/src/event/routes/test/token.rs
index 577fabd..2039d9b 100644
--- a/src/event/routes/test/token.rs
+++ b/src/event/routes/test/token.rs
@@ -4,7 +4,7 @@ use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
- test::fixtures::{self, future::Immediately as _},
+ test::fixtures::{self, future::Expect as _},
};
#[tokio::test]
@@ -40,14 +40,13 @@ async fn terminates_on_token_expiry() {
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
- assert!(events
+ events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
- .immediately()
- .await
- .is_none());
+ .expect_none("end of stream")
+ .await;
}
#[tokio::test]
@@ -86,12 +85,11 @@ async fn terminates_on_logout() {
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
- assert!(events
+ events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
- .immediately()
- .await
- .is_none());
+ .expect_none("end of stream")
+ .await;
}