summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs2
-rw-r--r--src/channel/routes.rs19
-rw-r--r--src/channel/routes/test/list.rs64
-rw-r--r--src/channel/routes/test/mod.rs3
-rw-r--r--src/channel/routes/test/on_create.rs58
-rw-r--r--src/channel/routes/test/on_send.rs148
-rw-r--r--src/clock.rs18
-rw-r--r--src/error.rs2
-rw-r--r--src/events/app.rs5
-rw-r--r--src/events/repo/broadcast.rs2
-rw-r--r--src/events/routes.rs11
-rw-r--r--src/events/routes/test.rs368
-rw-r--r--src/id.rs2
-rw-r--r--src/lib.rs2
-rw-r--r--src/login/app.rs19
-rw-r--r--src/login/extract.rs9
-rw-r--r--src/login/routes.rs5
-rw-r--r--src/login/routes/test/boot.rs9
-rw-r--r--src/login/routes/test/login.rs137
-rw-r--r--src/login/routes/test/logout.rs86
-rw-r--r--src/login/routes/test/mod.rs3
-rw-r--r--src/repo/channel.rs2
-rw-r--r--src/repo/login/store.rs2
-rw-r--r--src/test/fixtures/channel.rs24
-rw-r--r--src/test/fixtures/error.rs14
-rw-r--r--src/test/fixtures/future.rs55
-rw-r--r--src/test/fixtures/identity.rs27
-rw-r--r--src/test/fixtures/login.rs44
-rw-r--r--src/test/fixtures/message.rs26
-rw-r--r--src/test/fixtures/mod.rs28
-rw-r--r--src/test/mod.rs1
31 files changed, 1177 insertions, 18 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 48e3e3c..3c92d76 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -78,7 +78,7 @@ impl<'a> Channels<'a> {
channel: &channel::Id,
subscribed_at: &DateTime,
resume_at: Option<&str>,
- ) -> Result<impl Stream<Item = broadcast::Message>, EventsError> {
+ ) -> Result<impl Stream<Item = broadcast::Message> + std::fmt::Debug, EventsError> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 383ec58..674c876 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -17,14 +17,17 @@ use crate::{
},
};
+#[cfg(test)]
+mod test;
+
pub fn router() -> Router<App> {
Router::new()
- .route("/api/channels", get(list_channels))
+ .route("/api/channels", get(list))
.route("/api/channels", post(on_create))
.route("/api/channels/:channel", post(on_send))
}
-async fn list_channels(State(app): State<App>, _: Login) -> Result<Channels, InternalError> {
+async fn list(State(app): State<App>, _: Login) -> Result<Channels, InternalError> {
let channels = app.channels().all().await?;
let response = Channels(channels);
@@ -40,7 +43,7 @@ impl IntoResponse for Channels {
}
}
-#[derive(serde::Deserialize)]
+#[derive(Clone, serde::Deserialize)]
struct CreateRequest {
name: String,
}
@@ -59,6 +62,7 @@ async fn on_create(
Ok(Json(channel))
}
+#[derive(Debug)]
struct CreateError(app::CreateError);
impl IntoResponse for CreateError {
@@ -73,20 +77,20 @@ impl IntoResponse for CreateError {
}
}
-#[derive(serde::Deserialize)]
+#[derive(Clone, serde::Deserialize)]
struct SendRequest {
message: String,
}
async fn on_send(
+ State(app): State<App>,
Path(channel): Path<channel::Id>,
RequestedAt(sent_at): RequestedAt,
- State(app): State<App>,
login: Login,
- Json(form): Json<SendRequest>,
+ Json(request): Json<SendRequest>,
) -> Result<StatusCode, ErrorResponse> {
app.channels()
- .send(&login, &channel, &form.message, &sent_at)
+ .send(&login, &channel, &request.message, &sent_at)
.await
// Could impl `From` here, but it's more code and this is used once.
.map_err(ErrorResponse)?;
@@ -94,6 +98,7 @@ async fn on_send(
Ok(StatusCode::ACCEPTED)
}
+#[derive(Debug)]
struct ErrorResponse(EventsError);
impl IntoResponse for ErrorResponse {
diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs
new file mode 100644
index 0000000..f7f7b44
--- /dev/null
+++ b/src/channel/routes/test/list.rs
@@ -0,0 +1,64 @@
+use axum::extract::State;
+
+use crate::{channel::routes, test::fixtures};
+
+#[tokio::test]
+async fn empty_list() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let viewer = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let routes::Channels(channels) = routes::list(State(app), viewer)
+ .await
+ .expect("always succeeds");
+
+ // Verify the semantics
+
+ assert!(channels.is_empty());
+}
+
+#[tokio::test]
+async fn one_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let viewer = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let routes::Channels(channels) = routes::list(State(app), viewer)
+ .await
+ .expect("always succeeds");
+
+ // Verify the semantics
+
+ assert!(channels.contains(&channel));
+}
+
+#[tokio::test]
+async fn multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let viewer = fixtures::login::create(&app).await;
+ let channels = vec![
+ fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app).await,
+ ];
+
+ // Call the endpoint
+
+ let routes::Channels(response_channels) = routes::list(State(app), viewer)
+ .await
+ .expect("always succeeds");
+
+ // Verify the semantics
+
+ assert!(channels
+ .into_iter()
+ .all(|channel| response_channels.contains(&channel)));
+}
diff --git a/src/channel/routes/test/mod.rs b/src/channel/routes/test/mod.rs
new file mode 100644
index 0000000..ab663eb
--- /dev/null
+++ b/src/channel/routes/test/mod.rs
@@ -0,0 +1,3 @@
+mod list;
+mod on_create;
+mod on_send;
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
new file mode 100644
index 0000000..df23deb
--- /dev/null
+++ b/src/channel/routes/test/on_create.rs
@@ -0,0 +1,58 @@
+use axum::extract::{Json, State};
+
+use crate::{
+ channel::{app, routes},
+ test::fixtures,
+};
+
+#[tokio::test]
+async fn new_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let name = fixtures::channel::propose();
+ let request = routes::CreateRequest { name };
+ let Json(response_channel) =
+ routes::on_create(State(app.clone()), creator, Json(request.clone()))
+ .await
+ .expect("new channel in an empty app");
+
+ // Verify the structure of the response
+
+ assert_eq!(request.name, response_channel.name);
+
+ // Verify the semantics
+
+ let channels = app.channels().all().await.expect("always succeeds");
+
+ assert!(channels.contains(&response_channel));
+}
+
+#[tokio::test]
+async fn duplicate_name() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let creator = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let request = routes::CreateRequest { name: channel.name };
+ let routes::CreateError(error) =
+ routes::on_create(State(app.clone()), creator, Json(request.clone()))
+ .await
+ .expect_err("duplicate channel name");
+
+ // Verify the structure of the response
+
+ fixtures::error::expected!(
+ error,
+ app::CreateError::DuplicateName(name),
+ assert_eq!(request.name, name),
+ );
+}
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
new file mode 100644
index 0000000..eab7c32
--- /dev/null
+++ b/src/channel/routes/test/on_send.rs
@@ -0,0 +1,148 @@
+use axum::{
+ extract::{Json, Path, State},
+ http::StatusCode,
+};
+use futures::stream::StreamExt;
+
+use crate::{
+ channel::{app, routes},
+ repo::channel,
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn channel_exists() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let request = routes::SendRequest {
+ message: fixtures::message::propose(),
+ };
+ let status = routes::on_send(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ sent_at.clone(),
+ sender.clone(),
+ Json(request.clone()),
+ )
+ .await
+ .expect("sending to a valid channel");
+
+ // Verify the structure of the response
+
+ assert_eq!(StatusCode::ACCEPTED, status);
+
+ // Verify the semantics
+
+ let subscribed_at = fixtures::now();
+ let mut events = app
+ .channels()
+ .events(&channel.id, &subscribed_at, None)
+ .await
+ .expect("subscribing to a valid channel");
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("event received by subscribers");
+
+ assert_eq!(request.message, event.body);
+ assert_eq!(sender, event.sender);
+ assert_eq!(*sent_at, event.sent_at);
+}
+
+#[tokio::test]
+async fn messages_in_order() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint (twice)
+
+ let requests = vec![
+ (
+ fixtures::now(),
+ routes::SendRequest {
+ message: fixtures::message::propose(),
+ },
+ ),
+ (
+ fixtures::now(),
+ routes::SendRequest {
+ message: fixtures::message::propose(),
+ },
+ ),
+ ];
+
+ for (sent_at, request) in &requests {
+ routes::on_send(
+ State(app.clone()),
+ Path(channel.id.clone()),
+ sent_at.clone(),
+ sender.clone(),
+ Json(request.clone()),
+ )
+ .await
+ .expect("sending to a valid channel");
+ }
+
+ // Verify the semantics
+
+ let subscribed_at = fixtures::now();
+ let events = app
+ .channels()
+ .events(&channel.id, &subscribed_at, None)
+ .await
+ .expect("subscribing to a valid channel")
+ .take(requests.len());
+
+ let events = events.collect::<Vec<_>>().immediately().await;
+
+ for ((sent_at, request), event) in requests.into_iter().zip(events) {
+ assert_eq!(request.message, event.body);
+ assert_eq!(sender, event.sender);
+ assert_eq!(*sent_at, event.sent_at);
+ }
+}
+
+#[tokio::test]
+async fn nonexistent_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let login = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let sent_at = fixtures::now();
+ let channel = channel::Id::generate();
+ let request = routes::SendRequest {
+ message: fixtures::message::propose(),
+ };
+ let routes::ErrorResponse(error) = routes::on_send(
+ State(app),
+ Path(channel.clone()),
+ sent_at,
+ login,
+ Json(request),
+ )
+ .await
+ .expect_err("sending to a nonexistent channel");
+
+ // Verify the structure of the response
+
+ fixtures::error::expected!(
+ error,
+ app::EventsError::ChannelNotFound(error_channel),
+ assert_eq!(channel, error_channel)
+ );
+}
diff --git a/src/clock.rs b/src/clock.rs
index f7e728f..d162fc0 100644
--- a/src/clock.rs
+++ b/src/clock.rs
@@ -32,13 +32,27 @@ where
// This is purely for ergonomics: it allows `RequestedAt` to be extracted
// without having to wrap it in `Extension<>`. Callers _can_ still do that,
// but they aren't forced to.
- let Extension(requested_at) =
- Extension::<Self>::from_request_parts(parts, state).await?;
+ let Extension(requested_at) = Extension::<Self>::from_request_parts(parts, state).await?;
Ok(requested_at)
}
}
+impl From<DateTime> for RequestedAt {
+ fn from(timestamp: DateTime) -> Self {
+ Self(timestamp)
+ }
+}
+
+impl std::ops::Deref for RequestedAt {
+ type Target = DateTime;
+
+ fn deref(&self) -> &Self::Target {
+ let Self(timestamp) = self;
+ timestamp
+ }
+}
+
/// Computes a canonical "requested at" time for each request it wraps. This
/// time can be recovered using the [RequestedAt] extractor.
pub async fn middleware(mut req: Request, next: Next) -> Result<Response, StatusCode> {
diff --git a/src/error.rs b/src/error.rs
index 2a6555f..e2128d3 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -15,6 +15,7 @@ type BoxedError = Box<dyn error::Error + Send + Sync>;
// Returns a 500 Internal Server Error to the client. Meant to be used via the
// `?` operator; _does not_ return the originating error to the client.
+#[derive(Debug)]
pub struct InternalError(Id, BoxedError);
impl<E> From<E> for InternalError
@@ -40,6 +41,7 @@ impl IntoResponse for InternalError {
}
/// Transient identifier for an InternalError. Prefixed with `E`.
+#[derive(Debug)]
pub struct Id(BaseId);
impl From<BaseId> for Id {
diff --git a/src/events/app.rs b/src/events/app.rs
index c3a027d..99e849e 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -69,7 +69,10 @@ impl Broadcaster {
// panic: if ``channel`` has not been previously registered, and was not
// part of the initial set of channels.
- pub fn listen(&self, channel: &channel::Id) -> impl Stream<Item = broadcast::Message> {
+ pub fn listen(
+ &self,
+ channel: &channel::Id,
+ ) -> impl Stream<Item = broadcast::Message> + std::fmt::Debug {
let rx = self.sender(channel).subscribe();
BroadcastStream::from(rx)
diff --git a/src/events/repo/broadcast.rs b/src/events/repo/broadcast.rs
index 182203a..bffe991 100644
--- a/src/events/repo/broadcast.rs
+++ b/src/events/repo/broadcast.rs
@@ -21,7 +21,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Broadcast<'t>(&'t mut SqliteConnection);
-#[derive(Clone, Debug, serde::Serialize)]
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Message {
pub id: message::Id,
pub sender: Login,
diff --git a/src/events/routes.rs b/src/events/routes.rs
index ce5b778..a6bf5d9 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -22,11 +22,14 @@ use crate::{
repo::{channel, login::Login},
};
+#[cfg(test)]
+mod test;
+
pub fn router() -> Router<App> {
Router::new().route("/api/events", get(events))
}
-#[derive(serde::Deserialize)]
+#[derive(Clone, serde::Deserialize)]
struct EventsQuery {
#[serde(default, rename = "channel")]
channels: Vec<channel::Id>,
@@ -38,7 +41,7 @@ async fn events(
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId>,
Query(query): Query<EventsQuery>,
-) -> Result<Events<impl Stream<Item = ChannelEvent>>, ErrorResponse> {
+) -> Result<Events<impl Stream<Item = ChannelEvent> + std::fmt::Debug>, ErrorResponse> {
let resume_at = last_event_id.as_deref();
let streams = stream::iter(query.channels)
@@ -64,6 +67,7 @@ async fn events(
Ok(Events(stream))
}
+#[derive(Debug)]
struct Events<S>(S);
impl<S> IntoResponse for Events<S>
@@ -79,6 +83,7 @@ where
}
}
+#[derive(Debug)]
struct ErrorResponse(EventsError);
impl IntoResponse for ErrorResponse {
@@ -96,7 +101,7 @@ impl IntoResponse for ErrorResponse {
}
}
-#[derive(serde::Serialize)]
+#[derive(Debug, serde::Serialize)]
struct ChannelEvent {
channel: channel::Id,
#[serde(flatten)]
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
new file mode 100644
index 0000000..df2d5f6
--- /dev/null
+++ b/src/events/routes/test.rs
@@ -0,0 +1,368 @@
+use axum::extract::State;
+use axum_extra::extract::Query;
+use futures::{
+ future,
+ stream::{self, StreamExt as _},
+};
+
+use crate::{
+ channel::app,
+ events::routes,
+ repo::channel::{self},
+ test::fixtures::{self, future::Immediately as _},
+};
+
+#[tokio::test]
+async fn no_subscriptions() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscriber = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery { channels: vec![] };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("empty subscription");
+
+ // Verify the structure of the response.
+
+ assert!(events.next().immediately().await.is_none());
+}
+
+#[tokio::test]
+async fn includes_historical_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the structure of the response.
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered stored message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_live_message() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber,
+ None,
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered live message");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn excludes_other_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let subscribed = fixtures::channel::create(&app).await;
+ let unsubscribed = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+ let message = fixtures::message::send(&app, &sender, &subscribed, &fixtures::now()).await;
+ fixtures::message::send(&app, &sender, &unsubscribed, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![subscribed.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered at least one message");
+
+ assert_eq!(subscribed.id, event.channel);
+ assert_eq!(message, event.message);
+}
+
+#[tokio::test]
+async fn includes_multiple_channels() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+
+ let channels = [
+ fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app).await,
+ ];
+
+ let messages = stream::iter(channels)
+ .then(|channel| async {
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ (channel, message)
+ })
+ .collect::<Vec<_>>()
+ .await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: messages
+ .iter()
+ .map(|(channel, _)| &channel.id)
+ .cloned()
+ .collect(),
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channels");
+
+ // Verify the structure of the response.
+
+ let events = events
+ .take(messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for (channel, message) in messages {
+ assert!(events
+ .iter()
+ .any(|event| { event.channel == channel.id && event.message == message }));
+ }
+}
+
+#[tokio::test]
+async fn nonexitent_channel() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = channel::Id::generate();
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.clone()],
+ };
+ let routes::ErrorResponse(error) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect_err("subscribed to nonexistent channel");
+
+ // Verify the structure of the response.
+
+ fixtures::error::expected!(
+ error,
+ app::EventsError::ChannelNotFound(error_channel),
+ assert_eq!(channel, error_channel)
+ );
+}
+
+#[tokio::test]
+async fn sequential_messages() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let mut events = events.filter(|event| future::ready(messages.contains(&event.message)));
+
+ // Verify delivery in order
+ for message in &messages {
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("undelivered messages remaining");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, &event.message);
+ }
+}
+
+#[tokio::test]
+async fn resumes_from() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let channel = fixtures::channel::create(&app).await;
+ let sender = fixtures::login::create(&app).await;
+
+ let initial_message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ let later_messages = vec![
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await,
+ ];
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+
+ let resume_at = {
+ // First subscription
+ let routes::Events(mut events) = routes::events(
+ State(app.clone()),
+ subscribed_at,
+ subscriber.clone(),
+ None,
+ Query(query.clone()),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ let event = events.next().immediately().await.expect("delivered events");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(initial_message, event.message);
+
+ event.event_id()
+ };
+
+ // Resume after disconnect
+ let resumed_at = fixtures::now();
+ let routes::Events(resumed) = routes::events(
+ State(app),
+ resumed_at,
+ subscriber,
+ Some(resume_at.into()),
+ Query(query),
+ )
+ .await
+ .expect("subscribed to a valid channel");
+
+ // Verify the structure of the response.
+
+ let events = resumed
+ .take(later_messages.len())
+ .collect::<Vec<_>>()
+ .immediately()
+ .await;
+
+ for message in later_messages {
+ assert!(events
+ .iter()
+ .any(|event| event.channel == channel.id && event.message == message));
+ }
+}
+
+#[tokio::test]
+async fn removes_expired_messages() {
+ // Set up the environment
+ let app = fixtures::scratch_app().await;
+ let sender = fixtures::login::create(&app).await;
+ let channel = fixtures::channel::create(&app).await;
+
+ fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
+ let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
+
+ // Call the endpoint
+
+ let subscriber = fixtures::login::create(&app).await;
+ let subscribed_at = fixtures::now();
+ let query = routes::EventsQuery {
+ channels: vec![channel.id.clone()],
+ };
+ let routes::Events(mut events) =
+ routes::events(State(app), subscribed_at, subscriber, None, Query(query))
+ .await
+ .expect("subscribed to valid channel");
+
+ // Verify the semantics
+
+ let event = events
+ .next()
+ .immediately()
+ .await
+ .expect("delivered messages");
+
+ assert_eq!(channel.id, event.channel);
+ assert_eq!(message, event.message);
+}
diff --git a/src/id.rs b/src/id.rs
index ce7f13b..22add08 100644
--- a/src/id.rs
+++ b/src/id.rs
@@ -27,7 +27,7 @@ pub const ID_SIZE: usize = 15;
//
// By convention, the prefix should be UPPERCASE - note that the alphabet for this
// is entirely lowercase.
-#[derive(Clone, Debug, Hash, PartialEq, Eq, sqlx::Type, serde::Deserialize, serde::Serialize)]
+#[derive(Clone, Debug, Hash, Eq, PartialEq, sqlx::Type, serde::Deserialize, serde::Serialize)]
#[sqlx(transparent)]
#[serde(transparent)]
pub struct Id(String);
diff --git a/src/lib.rs b/src/lib.rs
index 609142a..09bfac4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,3 +9,5 @@ mod id;
mod login;
mod password;
mod repo;
+#[cfg(test)]
+mod test;
diff --git a/src/login/app.rs b/src/login/app.rs
index 292a564..10609c6 100644
--- a/src/login/app.rs
+++ b/src/login/app.rs
@@ -48,6 +48,17 @@ impl<'a> Logins<'a> {
Ok(token)
}
+ #[cfg(test)]
+ pub async fn create(&self, name: &str, password: &str) -> Result<Login, CreateError> {
+ let password_hash = StoredHash::new(password)?;
+
+ let mut tx = self.db.begin().await?;
+ let login = tx.logins().create(name, &password_hash).await?;
+ tx.commit().await?;
+
+ Ok(login)
+ }
+
pub async fn validate(&self, secret: &str, used_at: &DateTime) -> Result<Login, ValidateError> {
// Somewhat arbitrarily, expire after 7 days.
let expire_at = used_at.to_owned() - TimeDelta::days(7);
@@ -87,6 +98,14 @@ pub enum LoginError {
PasswordHashError(#[from] password_hash::Error),
}
+#[cfg(test)]
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+pub enum CreateError {
+ DatabaseError(#[from] sqlx::Error),
+ PasswordHashError(#[from] password_hash::Error),
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ValidateError {
#[error("invalid token")]
diff --git a/src/login/extract.rs b/src/login/extract.rs
index 735bc22..bda55cd 100644
--- a/src/login/extract.rs
+++ b/src/login/extract.rs
@@ -7,11 +7,20 @@ use axum_extra::extract::cookie::{Cookie, CookieJar};
// The usage pattern here - receive the extractor as an argument, return it in
// the response - is heavily modelled after CookieJar's own intended usage.
+#[derive(Clone, Debug)]
pub struct IdentityToken {
cookies: CookieJar,
}
impl IdentityToken {
+ /// Creates a new, unpopulated identity token store.
+ #[cfg(test)]
+ pub fn new() -> Self {
+ Self {
+ cookies: CookieJar::new(),
+ }
+ }
+
/// Get the identity secret sent in the request, if any. If the identity
/// was not sent, or if it has previously been [clear]ed, then this will
/// return [None]. If the identity has previously been [set], then this
diff --git a/src/login/routes.rs b/src/login/routes.rs
index 41554dd..06e5853 100644
--- a/src/login/routes.rs
+++ b/src/login/routes.rs
@@ -10,6 +10,9 @@ use crate::{app::App, clock::RequestedAt, error::InternalError, repo::login::Log
use super::{app, extract::IdentityToken};
+#[cfg(test)]
+mod test;
+
pub fn router() -> Router<App> {
Router::new()
.route("/api/boot", get(boot))
@@ -53,6 +56,7 @@ async fn on_login(
Ok((identity, StatusCode::NO_CONTENT))
}
+#[derive(Debug)]
struct LoginError(app::LoginError);
impl IntoResponse for LoginError {
@@ -85,6 +89,7 @@ async fn on_logout(
Ok((identity, StatusCode::NO_CONTENT))
}
+#[derive(Debug)]
struct LogoutError(app::ValidateError);
impl IntoResponse for LogoutError {
diff --git a/src/login/routes/test/boot.rs b/src/login/routes/test/boot.rs
new file mode 100644
index 0000000..dee554f
--- /dev/null
+++ b/src/login/routes/test/boot.rs
@@ -0,0 +1,9 @@
+use crate::{login::routes, test::fixtures};
+
+#[tokio::test]
+async fn returns_identity() {
+ let login = fixtures::login::fictitious();
+ let response = routes::boot(login.clone()).await;
+
+ assert_eq!(login, response.login);
+}
diff --git a/src/login/routes/test/login.rs b/src/login/routes/test/login.rs
new file mode 100644
index 0000000..4fa491a
--- /dev/null
+++ b/src/login/routes/test/login.rs
@@ -0,0 +1,137 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+};
+
+use crate::{
+ login::{app, routes},
+ test::fixtures,
+};
+
+#[tokio::test]
+async fn new_identity() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Call the endpoint
+
+ let identity = fixtures::identity::not_logged_in();
+ let logged_in_at = fixtures::now();
+ let (name, password) = fixtures::login::propose();
+ let request = routes::LoginRequest {
+ name: name.clone(),
+ password,
+ };
+ let (identity, status) =
+ routes::on_login(State(app.clone()), logged_in_at, identity, Json(request))
+ .await
+ .expect("logged in with valid credentials");
+
+ // Verify the return value's basic structure
+
+ assert_eq!(StatusCode::NO_CONTENT, status);
+ let secret = identity.secret().expect("logged in with valid credentials");
+
+ // Verify the semantics
+
+ let validated_at = fixtures::now();
+ let validated = app
+ .logins()
+ .validate(secret, &validated_at)
+ .await
+ .expect("identity secret is valid");
+
+ assert_eq!(name, validated.name);
+}
+
+#[tokio::test]
+async fn existing_identity() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let (name, password) = fixtures::login::create_for_login(&app).await;
+
+ // Call the endpoint
+
+ let identity = fixtures::identity::not_logged_in();
+ let logged_in_at = fixtures::now();
+ let request = routes::LoginRequest {
+ name: name.clone(),
+ password,
+ };
+ let (identity, status) =
+ routes::on_login(State(app.clone()), logged_in_at, identity, Json(request))
+ .await
+ .expect("logged in with valid credentials");
+
+ // Verify the return value's basic structure
+
+ assert_eq!(StatusCode::NO_CONTENT, status);
+ let secret = identity.secret().expect("logged in with valid credentials");
+
+ // Verify the semantics
+
+ let validated_at = fixtures::now();
+ let validated_login = app
+ .logins()
+ .validate(secret, &validated_at)
+ .await
+ .expect("identity secret is valid");
+
+ assert_eq!(name, validated_login.name);
+}
+
+#[tokio::test]
+async fn authentication_failed() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let login = fixtures::login::create(&app).await;
+
+ // Call the endpoint
+
+ let logged_in_at = fixtures::now();
+ let identity = fixtures::identity::not_logged_in();
+ let request = routes::LoginRequest {
+ name: login.name,
+ password: fixtures::login::propose_password(),
+ };
+ let routes::LoginError(error) =
+ routes::on_login(State(app.clone()), logged_in_at, identity, Json(request))
+ .await
+ .expect_err("logged in with an incorrect password");
+
+ // Verify the return value's basic structure
+
+ fixtures::error::expected!(error, app::LoginError::Rejected);
+}
+
+#[tokio::test]
+async fn token_expires() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let (name, password) = fixtures::login::create_for_login(&app).await;
+
+ // Call the endpoint
+
+ let logged_in_at = fixtures::ancient();
+ let identity = fixtures::identity::not_logged_in();
+ let request = routes::LoginRequest { name, password };
+ let (identity, _) = routes::on_login(State(app.clone()), logged_in_at, identity, Json(request))
+ .await
+ .expect("logged in with valid credentials");
+ let token = identity.secret().expect("logged in with valid credentials");
+
+ // Verify the semantics
+
+ let verified_at = fixtures::now();
+ let error = app
+ .logins()
+ .validate(token, &verified_at)
+ .await
+ .expect_err("validating an expired token");
+
+ fixtures::error::expected!(error, app::ValidateError::InvalidToken);
+}
diff --git a/src/login/routes/test/logout.rs b/src/login/routes/test/logout.rs
new file mode 100644
index 0000000..003bc8e
--- /dev/null
+++ b/src/login/routes/test/logout.rs
@@ -0,0 +1,86 @@
+use axum::{
+ extract::{Json, State},
+ http::StatusCode,
+};
+
+use crate::{
+ login::{app, routes},
+ test::fixtures,
+};
+
+#[tokio::test]
+async fn successful() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+ let now = fixtures::now();
+ let login = fixtures::login::create_for_login(&app).await;
+ let identity = fixtures::identity::logged_in(&app, &login, &now).await;
+ let secret = fixtures::identity::secret(&identity);
+
+ // Call the endpoint
+
+ let (response_identity, response_status) = routes::on_logout(
+ State(app.clone()),
+ identity.clone(),
+ Json(routes::LogoutRequest {}),
+ )
+ .await
+ .expect("logged out with a valid token");
+
+ // Verify the return value's basic structure
+
+ assert!(response_identity.secret().is_none());
+ assert_eq!(StatusCode::NO_CONTENT, response_status);
+
+ // Verify the semantics
+
+ let error = app
+ .logins()
+ .validate(secret, &now)
+ .await
+ .expect_err("secret is invalid");
+ match error {
+ app::ValidateError::InvalidToken => (), // should be invalid
+ other => panic!("expected ValidateError::InvalidToken, got {other:#}"),
+ }
+}
+
+#[tokio::test]
+async fn no_identity() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Call the endpoint
+
+ let identity = fixtures::identity::not_logged_in();
+ let (identity, status) =
+ routes::on_logout(State(app), identity, Json(routes::LogoutRequest {}))
+ .await
+ .expect("logged out with no token");
+
+ // Verify the return value's basic structure
+
+ assert!(identity.secret().is_none());
+ assert_eq!(StatusCode::NO_CONTENT, status);
+}
+
+#[tokio::test]
+async fn invalid_token() {
+ // Set up the environment
+
+ let app = fixtures::scratch_app().await;
+
+ // Call the endpoint
+
+ let identity = fixtures::identity::fictitious();
+ let routes::LogoutError(error) =
+ routes::on_logout(State(app), identity, Json(routes::LogoutRequest {}))
+ .await
+ .expect_err("logged out with an invalid token");
+
+ // Verify the return value's basic structure
+
+ fixtures::error::expected!(error, app::ValidateError::InvalidToken);
+}
diff --git a/src/login/routes/test/mod.rs b/src/login/routes/test/mod.rs
new file mode 100644
index 0000000..7693755
--- /dev/null
+++ b/src/login/routes/test/mod.rs
@@ -0,0 +1,3 @@
+mod boot;
+mod login;
+mod logout;
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index 95516d2..8f089e8 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -16,7 +16,7 @@ impl<'c> Provider for Transaction<'c, Sqlite> {
pub struct Channels<'t>(&'t mut SqliteConnection);
-#[derive(Debug, serde::Serialize)]
+#[derive(Debug, Eq, PartialEq, serde::Serialize)]
pub struct Channel {
pub id: Id,
pub name: String,
diff --git a/src/repo/login/store.rs b/src/repo/login/store.rs
index d979579..2f922d7 100644
--- a/src/repo/login/store.rs
+++ b/src/repo/login/store.rs
@@ -18,7 +18,7 @@ pub struct Logins<'t>(&'t mut SqliteConnection);
// can be used as an extractor for endpoints that want to require login, or for
// endpoints that need to behave differently depending on whether the client is
// or is not logged in.
-#[derive(Clone, Debug, serde::Serialize)]
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
pub struct Login {
pub id: Id,
pub name: String,
diff --git a/src/test/fixtures/channel.rs b/src/test/fixtures/channel.rs
new file mode 100644
index 0000000..0558395
--- /dev/null
+++ b/src/test/fixtures/channel.rs
@@ -0,0 +1,24 @@
+use faker_rand::{
+ en_us::{addresses::CityName, names::FullName},
+ faker_impl_from_templates,
+};
+use rand;
+
+use crate::{app::App, repo::channel::Channel};
+
+pub async fn create(app: &App) -> Channel {
+ let name = propose();
+ app.channels()
+ .create(&name)
+ .await
+ .expect("should always succeed if the channel is actually new")
+}
+
+pub fn propose() -> String {
+ rand::random::<Name>().to_string()
+}
+
+struct Name(String);
+faker_impl_from_templates! {
+ Name; "{} {}", CityName, FullName;
+}
diff --git a/src/test/fixtures/error.rs b/src/test/fixtures/error.rs
new file mode 100644
index 0000000..559afee
--- /dev/null
+++ b/src/test/fixtures/error.rs
@@ -0,0 +1,14 @@
+macro_rules! expected {
+ ($expr:expr, $expect:pat $(,)?) => {
+ $crate::test::fixtures::error::expected!($expr, $expect, ())
+ };
+
+ ($expr:expr, $expect:pat, $body:expr $(,)?) => {
+ match $expr {
+ $expect => $body,
+ other => panic!("expected {}, found {other:#?}", stringify!($expect)),
+ }
+ };
+}
+
+pub(crate) use expected;
diff --git a/src/test/fixtures/future.rs b/src/test/fixtures/future.rs
new file mode 100644
index 0000000..bbdc9f8
--- /dev/null
+++ b/src/test/fixtures/future.rs
@@ -0,0 +1,55 @@
+use std::{future::IntoFuture, time::Duration};
+
+use futures::{stream, Stream};
+use tokio::time::timeout;
+
+async fn immediately<F>(fut: F) -> F::Output
+where
+ F: IntoFuture,
+{
+ // I haven't been particularly rigorous here. Zero delay _seems to work_,
+ // but this can be set higher; it makes tests that fail to meet the
+ // "immediate" expectation take longer, but gives slow tests time to
+ // succeed, as well.
+ let duration = Duration::from_nanos(0);
+ timeout(duration, fut)
+ .await
+ .expect("expected result immediately")
+}
+
+// This is only intended for streams, since their `next()`, `collect()`, and
+// so on can all block indefinitely on an empty stream. There's no need to
+// force immediacy on futures that "can't" block forever, and it can hide logic
+// errors if you do that.
+//
+// The impls below _could_ be replaced with a blanket impl for all future
+// types, otherwise. The choice to restrict impls to stream futures is
+// deliberate.
+pub trait Immediately {
+ type Output;
+
+ async fn immediately(self) -> Self::Output;
+}
+
+impl<'a, St> Immediately for stream::Next<'a, St>
+where
+ St: Stream + Unpin + ?Sized,
+{
+ type Output = Option<<St as Stream>::Item>;
+
+ async fn immediately(self) -> Self::Output {
+ immediately(self).await
+ }
+}
+
+impl<St, C> Immediately for stream::Collect<St, C>
+where
+ St: Stream,
+ C: Default + Extend<<St as Stream>::Item>,
+{
+ type Output = C;
+
+ async fn immediately(self) -> Self::Output {
+ immediately(self).await
+ }
+}
diff --git a/src/test/fixtures/identity.rs b/src/test/fixtures/identity.rs
new file mode 100644
index 0000000..16463aa
--- /dev/null
+++ b/src/test/fixtures/identity.rs
@@ -0,0 +1,27 @@
+use uuid::Uuid;
+
+use crate::{app::App, clock::RequestedAt, login::extract::IdentityToken};
+
+pub fn not_logged_in() -> IdentityToken {
+ IdentityToken::new()
+}
+
+pub async fn logged_in(app: &App, login: &(String, String), now: &RequestedAt) -> IdentityToken {
+ let (name, password) = login;
+ let token = app
+ .logins()
+ .login(name, password, now)
+ .await
+ .expect("should succeed given known-valid credentials");
+
+ IdentityToken::new().set(&token)
+}
+
+pub fn secret(identity: &IdentityToken) -> &str {
+ identity.secret().expect("identity contained a secret")
+}
+
+pub fn fictitious() -> IdentityToken {
+ let token = Uuid::new_v4().to_string();
+ IdentityToken::new().set(&token)
+}
diff --git a/src/test/fixtures/login.rs b/src/test/fixtures/login.rs
new file mode 100644
index 0000000..b2a4292
--- /dev/null
+++ b/src/test/fixtures/login.rs
@@ -0,0 +1,44 @@
+use faker_rand::en_us::internet;
+use uuid::Uuid;
+
+use crate::{
+ app::App,
+ repo::login::{self, Login},
+};
+
+pub async fn create_for_login(app: &App) -> (String, String) {
+ let (name, password) = propose();
+ app.logins()
+ .create(&name, &password)
+ .await
+ .expect("should always succeed if the login is actually new");
+
+ (name, password)
+}
+
+pub async fn create(app: &App) -> Login {
+ let (name, password) = propose();
+ app.logins()
+ .create(&name, &password)
+ .await
+ .expect("should always succeed if the login is actually new")
+}
+
+pub fn fictitious() -> Login {
+ Login {
+ id: login::Id::generate(),
+ name: name(),
+ }
+}
+
+pub fn propose() -> (String, String) {
+ (name(), propose_password())
+}
+
+fn name() -> String {
+ rand::random::<internet::Username>().to_string()
+}
+
+pub fn propose_password() -> String {
+ Uuid::new_v4().to_string()
+}
diff --git a/src/test/fixtures/message.rs b/src/test/fixtures/message.rs
new file mode 100644
index 0000000..7fe3cb9
--- /dev/null
+++ b/src/test/fixtures/message.rs
@@ -0,0 +1,26 @@
+use faker_rand::lorem::Paragraphs;
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ events::repo::broadcast,
+ repo::{channel::Channel, login::Login},
+};
+
+pub async fn send(
+ app: &App,
+ login: &Login,
+ channel: &Channel,
+ sent_at: &RequestedAt,
+) -> broadcast::Message {
+ let body = propose();
+
+ app.channels()
+ .send(login, &channel.id, &body, sent_at)
+ .await
+ .expect("should succeed if the channel exists")
+}
+
+pub fn propose() -> String {
+ rand::random::<Paragraphs>().to_string()
+}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
new file mode 100644
index 0000000..05e3f3f
--- /dev/null
+++ b/src/test/fixtures/mod.rs
@@ -0,0 +1,28 @@
+use chrono::{TimeDelta, Utc};
+
+use crate::{app::App, clock::RequestedAt, repo::pool};
+
+pub mod channel;
+pub mod error;
+pub mod future;
+pub mod identity;
+pub mod login;
+pub mod message;
+
+pub async fn scratch_app() -> App {
+ let pool = pool::prepare("sqlite::memory:")
+ .await
+ .expect("setting up in-memory sqlite database");
+ App::from(pool)
+ .await
+ .expect("creating an app from a fresh, in-memory database")
+}
+
+pub fn now() -> RequestedAt {
+ Utc::now().into()
+}
+
+pub fn ancient() -> RequestedAt {
+ let timestamp = Utc::now() - TimeDelta::days(365);
+ timestamp.into()
+}
diff --git a/src/test/mod.rs b/src/test/mod.rs
new file mode 100644
index 0000000..d066349
--- /dev/null
+++ b/src/test/mod.rs
@@ -0,0 +1 @@
+pub mod fixtures;