1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
use axum::extract::State;
use axum_extra::extract::Query;
use futures::{future, stream::StreamExt as _};
use crate::{
event::routes::get,
test::fixtures::{self, future::Immediately as _},
};
#[tokio::test]
async fn terminates_on_token_expiry() {
// Set up the environment
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
// Subscribe via the endpoint
let subscriber_creds = fixtures::login::create_with_password(&app, &fixtures::now()).await;
let subscriber =
fixtures::identity::logged_in(&app, &subscriber_creds, &fixtures::ancient()).await;
let get::Response(events) =
get::handler(State(app.clone()), subscriber, None, Query::default())
.await
.expect("subscribe never fails");
// Verify the resulting stream's behaviour
app.tokens()
.expire(&fixtures::now())
.await
.expect("expiring tokens succeeds");
// These should not be delivered.
let messages = [
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.immediately()
.await
.is_none());
}
#[tokio::test]
async fn terminates_on_logout() {
// Set up the environment
let app = fixtures::scratch_app().await;
let channel = fixtures::channel::create(&app, &fixtures::now()).await;
let sender = fixtures::login::create(&app, &fixtures::now()).await;
// Subscribe via the endpoint
let subscriber = fixtures::identity::create(&app, &fixtures::now()).await;
let get::Response(events) = get::handler(
State(app.clone()),
subscriber.clone(),
None,
Query::default(),
)
.await
.expect("subscribe never fails");
// Verify the resulting stream's behaviour
app.tokens()
.logout(&subscriber.token)
.await
.expect("expiring tokens succeeds");
// These should not be delivered.
let messages = [
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
fixtures::message::send(&app, &channel, &sender, &fixtures::now()).await,
];
assert!(events
.filter_map(fixtures::event::message)
.filter_map(fixtures::event::message::sent)
.filter(|event| future::ready(messages.iter().any(|message| &event.message == message)))
.next()
.immediately()
.await
.is_none());
}
|