summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel/handlers/create/test.rs4
-rw-r--r--src/channel/handlers/send/test.rs4
-rw-r--r--src/event/handlers/stream/test/channel.rs28
-rw-r--r--src/event/handlers/stream/test/invite.rs8
-rw-r--r--src/event/handlers/stream/test/message.rs36
-rw-r--r--src/event/handlers/stream/test/resume.rs20
-rw-r--r--src/event/handlers/stream/test/setup.rs4
-rw-r--r--src/event/handlers/stream/test/token.rs12
-rw-r--r--src/test/fixtures/event.rs79
-rw-r--r--src/test/fixtures/event/mod.rs74
-rw-r--r--src/test/fixtures/event/stream.rs62
11 files changed, 194 insertions, 137 deletions
diff --git a/src/channel/handlers/create/test.rs b/src/channel/handlers/create/test.rs
index 3c770cf..595a879 100644
--- a/src/channel/handlers/create/test.rs
+++ b/src/channel/handlers/create/test.rs
@@ -47,8 +47,8 @@ async fn new_channel() {
.subscribe(resume_point)
.await
.expect("subscribing never fails")
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == response));
let event = events.next().expect_some("creation event published").await;
diff --git a/src/channel/handlers/send/test.rs b/src/channel/handlers/send/test.rs
index f43f901..7204ca4 100644
--- a/src/channel/handlers/send/test.rs
+++ b/src/channel/handlers/send/test.rs
@@ -45,8 +45,8 @@ async fn messages_in_order() {
.subscribe(resume_point)
.await
.expect("subscribing to a valid channel succeeds")
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(requests));
while let Some((event, (sent_at, body))) = events
diff --git a/src/event/handlers/stream/test/channel.rs b/src/event/handlers/stream/test/channel.rs
index 187c3c3..2b87ce2 100644
--- a/src/event/handlers/stream/test/channel.rs
+++ b/src/event/handlers/stream/test/channel.rs
@@ -35,8 +35,8 @@ async fn creating() {
// Verify channel created event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -74,8 +74,8 @@ async fn previously_created() {
// Verify channel created event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::created)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::created)
.filter(|event| future::ready(event.channel == channel))
.next()
.expect_some("channel created event is delivered")
@@ -111,8 +111,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -148,8 +148,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -185,8 +185,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -222,8 +222,8 @@ async fn previously_deleted() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_some("a deleted channel event will be delivered")
@@ -264,8 +264,8 @@ async fn previously_purged() {
// Check for expiry event
events
- .filter_map(fixtures::event::channel)
- .filter_map(fixtures::event::channel::deleted)
+ .filter_map(fixtures::event::stream::channel)
+ .filter_map(fixtures::event::stream::channel::deleted)
.filter(|event| future::ready(event.id == channel.id))
.next()
.expect_wait("deleted channel events not delivered")
diff --git a/src/event/handlers/stream/test/invite.rs b/src/event/handlers/stream/test/invite.rs
index c8e12fb..01372ce 100644
--- a/src/event/handlers/stream/test/invite.rs
+++ b/src/event/handlers/stream/test/invite.rs
@@ -37,8 +37,8 @@ async fn accepting_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
@@ -78,8 +78,8 @@ async fn previously_accepted_invite() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == joiner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/message.rs b/src/event/handlers/stream/test/message.rs
index a80c896..4369996 100644
--- a/src/event/handlers/stream/test/message.rs
+++ b/src/event/handlers/stream/test/message.rs
@@ -44,8 +44,8 @@ async fn sending() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -89,8 +89,8 @@ async fn previously_sent() {
// Verify that an event is delivered
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == message))
.next()
.expect_some("delivered message sent event")
@@ -135,8 +135,8 @@ async fn sent_in_multiple_channels() {
// Verify the structure of the response.
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.take(messages.len())
.collect::<Vec<_>>()
.expect_ready("events ready")
@@ -177,8 +177,8 @@ async fn sent_sequentially() {
// Verify the expected events in the expected order
let mut events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)));
for message in &messages {
@@ -222,8 +222,8 @@ async fn expiring() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -261,8 +261,8 @@ async fn previously_expired() {
// Check for expiry event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -300,8 +300,8 @@ async fn deleting() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -339,8 +339,8 @@ async fn previously_deleted() {
// Check for delete event
let _ = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_some("a deleted message event will be delivered")
@@ -384,8 +384,8 @@ async fn previously_purged() {
// Check for delete event
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::deleted)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::deleted)
.filter(|event| future::ready(event.id == message.id))
.next()
.expect_wait("no deleted message will be delivered")
diff --git a/src/event/handlers/stream/test/resume.rs b/src/event/handlers/stream/test/resume.rs
index 34fee4d..835d350 100644
--- a/src/event/handlers/stream/test/resume.rs
+++ b/src/event/handlers/stream/test/resume.rs
@@ -41,8 +41,8 @@ async fn resume() {
.expect("subscribe never fails");
let event = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(event.message == initial_message))
.next()
.expect_some("delivered event for initial message")
@@ -64,8 +64,8 @@ async fn resume() {
// Verify final events
let mut events = resumed
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(later_messages));
while let Some((event, message)) = events.next().expect_ready("event ready").await {
@@ -125,8 +125,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(initial_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -168,8 +168,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(resume_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
@@ -211,8 +211,8 @@ async fn serial_resume() {
// Check for expected events
let events = events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.zip(stream::iter(final_messages))
.collect::<Vec<_>>()
.expect_ready("zipping a finite list of events is ready immediately")
diff --git a/src/event/handlers/stream/test/setup.rs b/src/event/handlers/stream/test/setup.rs
index 5335055..992b962 100644
--- a/src/event/handlers/stream/test/setup.rs
+++ b/src/event/handlers/stream/test/setup.rs
@@ -38,8 +38,8 @@ async fn previously_completed() {
// Expect a login created event
let _ = events
- .filter_map(fixtures::event::user)
- .filter_map(fixtures::event::user::created)
+ .filter_map(fixtures::event::stream::user)
+ .filter_map(fixtures::event::stream::user::created)
.filter(|event| future::ready(event.user == owner))
.next()
.expect_some("a login created event is sent")
diff --git a/src/event/handlers/stream/test/token.rs b/src/event/handlers/stream/test/token.rs
index 2008323..e32b489 100644
--- a/src/event/handlers/stream/test/token.rs
+++ b/src/event/handlers/stream/test/token.rs
@@ -43,8 +43,8 @@ async fn terminates_on_token_expiry() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -89,8 +89,8 @@ async fn terminates_on_logout() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
@@ -139,8 +139,8 @@ async fn terminates_on_password_change() {
];
events
- .filter_map(fixtures::event::message)
- .filter_map(fixtures::event::message::sent)
+ .filter_map(fixtures::event::stream::message)
+ .filter_map(fixtures::event::stream::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.expect_none("end of stream")
diff --git a/src/test/fixtures/event.rs b/src/test/fixtures/event.rs
deleted file mode 100644
index a30bb4b..0000000
--- a/src/test/fixtures/event.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use std::future::{self, Ready};
-
-use crate::event::Event;
-
-pub fn channel(event: Event) -> Ready<Option<channel::Event>> {
- future::ready(match event {
- Event::Channel(channel) => Some(channel),
- _ => None,
- })
-}
-
-pub fn message(event: Event) -> Ready<Option<message::Event>> {
- future::ready(match event {
- Event::Message(event) => Some(event),
- _ => None,
- })
-}
-
-pub fn user(event: Event) -> Ready<Option<user::Event>> {
- future::ready(match event {
- Event::User(event) => Some(event),
- _ => None,
- })
-}
-
-pub mod channel {
- use std::future::{self, Ready};
-
- pub use crate::channel::Event;
- use crate::channel::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Created(_) => None,
- })
- }
-}
-
-pub mod message {
- use std::future::{self, Ready};
-
- pub use crate::message::Event;
- use crate::message::event;
-
- pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
- future::ready(match event {
- Event::Sent(event) => Some(event),
- Event::Deleted(_) => None,
- })
- }
-
- pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
- future::ready(match event {
- Event::Deleted(event) => Some(event),
- Event::Sent(_) => None,
- })
- }
-}
-
-pub mod user {
- use std::future::{self, Ready};
-
- pub use crate::user::Event;
- use crate::user::event;
-
- pub fn created(event: Event) -> Ready<Option<event::Created>> {
- future::ready(match event {
- Event::Created(event) => Some(event),
- })
- }
-}
diff --git a/src/test/fixtures/event/mod.rs b/src/test/fixtures/event/mod.rs
new file mode 100644
index 0000000..691cdeb
--- /dev/null
+++ b/src/test/fixtures/event/mod.rs
@@ -0,0 +1,74 @@
+use crate::event::Event;
+
+pub mod stream;
+
+pub fn channel(event: Event) -> Option<crate::channel::Event> {
+ match event {
+ Event::Channel(channel) => Some(channel),
+ _ => None,
+ }
+}
+
+pub fn message(event: Event) -> Option<crate::message::Event> {
+ match event {
+ Event::Message(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub fn user(event: Event) -> Option<crate::user::Event> {
+ match event {
+ Event::User(event) => Some(event),
+ _ => None,
+ }
+}
+
+pub mod channel {
+ use crate::channel::{Event, event};
+
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Created(_) => None,
+ }
+ }
+}
+
+pub mod message {
+ use crate::message::{Event, event};
+
+ pub fn sent(event: Event) -> Option<event::Sent> {
+ match event {
+ Event::Sent(event) => Some(event),
+ Event::Deleted(_) => None,
+ }
+ }
+
+ pub fn deleted(event: Event) -> Option<event::Deleted> {
+ match event {
+ Event::Deleted(event) => Some(event),
+ Event::Sent(_) => None,
+ }
+ }
+}
+
+pub mod user {
+ use crate::user::{Event, event};
+
+ // This could be defined as `-> event::Created`. However, I want the interface to be consistent
+ // with the event stream transformers for other types, and we'd have to refactor the return type
+ // to `-> Option<event::Created>` the instant users sprout a second event, anyways.
+ #[allow(clippy::unnecessary_wraps)]
+ pub fn created(event: Event) -> Option<event::Created> {
+ match event {
+ Event::Created(event) => Some(event),
+ }
+ }
+}
diff --git a/src/test/fixtures/event/stream.rs b/src/test/fixtures/event/stream.rs
new file mode 100644
index 0000000..6c2a1bf
--- /dev/null
+++ b/src/test/fixtures/event/stream.rs
@@ -0,0 +1,62 @@
+use std::future::{self, Ready};
+
+use crate::{event::Event, test::fixtures::event};
+
+pub fn channel(event: Event) -> Ready<Option<crate::channel::Event>> {
+ future::ready(event::channel(event))
+}
+
+pub fn message(event: Event) -> Ready<Option<crate::message::Event>> {
+ future::ready(event::message(event))
+}
+
+pub fn user(event: Event) -> Ready<Option<crate::user::Event>> {
+ future::ready(event::user(event))
+}
+
+pub mod channel {
+ use std::future::{self, Ready};
+
+ use crate::{
+ channel::{Event, event},
+ test::fixtures::event::channel,
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(channel::created(event))
+ }
+
+ pub fn deleted(event: Event) -> Ready<Option<event::Deleted>> {
+ future::ready(channel::deleted(event))
+ }
+}
+
+pub mod message {
+ use std::future::{self, Ready};
+
+ use crate::{
+ message::{Event, event},
+ test::fixtures::event::message,
+ };
+
+ pub fn sent(event: Event) -> Ready<Option<event::Sent>> {
+ future::ready(message::sent(event))
+ }
+
+ pub fn deleted(event: Event) -> future::Ready<Option<event::Deleted>> {
+ future::ready(message::deleted(event))
+ }
+}
+
+pub mod user {
+ use std::future::{self, Ready};
+
+ use crate::{
+ test::fixtures::event::user,
+ user::{Event, event},
+ };
+
+ pub fn created(event: Event) -> Ready<Option<event::Created>> {
+ future::ready(user::created(event))
+ }
+}