summaryrefslogtreecommitdiff
path: root/src/channel
diff options
context:
space:
mode:
authorKit La Touche <kit@transneptune.net>2024-09-28 21:55:50 -0400
committerKit La Touche <kit@transneptune.net>2024-09-28 21:55:50 -0400
commit897eef0306917baf3662e691b29f182d35805296 (patch)
tree024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/channel
parentc524b333befc8cc97aa49f73b3ed28bc3b82420c (diff)
parent4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff)
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/channel')
-rw-r--r--src/channel/app.rs38
-rw-r--r--src/channel/routes.rs3
-rw-r--r--src/channel/routes/test/list.rs6
-rw-r--r--src/channel/routes/test/on_create.rs51
-rw-r--r--src/channel/routes/test/on_send.rs93
5 files changed, 99 insertions, 92 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 793fa35..d7312e4 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -1,7 +1,9 @@
+use chrono::TimeDelta;
use sqlx::sqlite::SqlitePool;
use crate::{
- events::broadcaster::Broadcaster,
+ clock::DateTime,
+ events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent},
repo::channel::{Channel, Provider as _},
};
@@ -15,16 +17,18 @@ impl<'a> Channels<'a> {
Self { db, broadcaster }
}
- pub async fn create(&self, name: &str) -> Result<Channel, CreateError> {
+ pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
- .create(name)
+ .create(name, created_at)
.await
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
- self.broadcaster.register_channel(&channel.id);
tx.commit().await?;
+ self.broadcaster
+ .broadcast(&ChannelEvent::created(channel.clone()));
+
Ok(channel)
}
@@ -35,6 +39,32 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.channels().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for channel in expired {
+ let sequence = tx.message_events().assign_sequence(&channel).await?;
+ let event = tx
+ .channels()
+ .delete_expired(&channel, sequence, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index f524e62..1f8db5a 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -52,11 +52,12 @@ struct CreateRequest {
async fn on_create(
State(app): State<App>,
_: Login, // requires auth, but doesn't actually care who you are
+ RequestedAt(created_at): RequestedAt,
Json(form): Json<CreateRequest>,
) -> Result<Json<Channel>, CreateError> {
let channel = app
.channels()
- .create(&form.name)
+ .create(&form.name, &created_at)
.await
.map_err(CreateError)?;
diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs
index f7f7b44..bc94024 100644
--- a/src/channel/routes/test/list.rs
+++ b/src/channel/routes/test/list.rs
@@ -26,7 +26,7 @@ async fn one_channel() {
let app = fixtures::scratch_app().await;
let viewer = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
@@ -46,8 +46,8 @@ async fn multiple_channels() {
let app = fixtures::scratch_app().await;
let viewer = fixtures::login::create(&app).await;
let channels = vec![
- fixtures::channel::create(&app).await,
- fixtures::channel::create(&app).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
+ fixtures::channel::create(&app, &fixtures::now()).await,
];
// Call the endpoint
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index 23885c0..e2610a5 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -1,8 +1,10 @@
use axum::extract::{Json, State};
+use futures::stream::StreamExt as _;
use crate::{
channel::{app, routes},
- test::fixtures,
+ events::types,
+ test::fixtures::{self, future::Immediately as _},
};
#[tokio::test]
@@ -16,10 +18,14 @@ async fn new_channel() {
let name = fixtures::channel::propose();
let request = routes::CreateRequest { name };
- let Json(response_channel) =
- routes::on_create(State(app.clone()), creator, Json(request.clone()))
- .await
- .expect("new channel in an empty app");
+ let Json(response_channel) = routes::on_create(
+ State(app.clone()),
+ creator,
+ fixtures::now(),
+ Json(request.clone()),
+ )
+ .await
+ .expect("new channel in an empty app");
// Verify the structure of the response
@@ -28,8 +34,27 @@ async fn new_channel() {
// Verify the semantics
let channels = app.channels().all().await.expect("always succeeds");
-
assert!(channels.contains(&response_channel));
+
+ let mut events = app
+ .events()
+ .subscribe(types::ResumePoint::default())
+ .await
+ .expect("subscribing never fails")
+ .filter(fixtures::filter::created());
+
+ let types::ResumableEvent(_, event) = events
+ .next()
+ .immediately()
+ .await
+ .expect("creation event published");
+
+ assert_eq!(types::Sequence::default(), event.sequence);
+ assert!(matches!(
+ event.data,
+ types::ChannelEventData::Created(event)
+ if event.channel == response_channel
+ ));
}
#[tokio::test]
@@ -38,15 +63,19 @@ async fn duplicate_name() {
let app = fixtures::scratch_app().await;
let creator = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint
let request = routes::CreateRequest { name: channel.name };
- let routes::CreateError(error) =
- routes::on_create(State(app.clone()), creator, Json(request.clone()))
- .await
- .expect_err("duplicate channel name");
+ let routes::CreateError(error) = routes::on_create(
+ State(app.clone()),
+ creator,
+ fixtures::now(),
+ Json(request.clone()),
+ )
+ .await
+ .expect_err("duplicate channel name");
// Verify the structure of the response
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 93a5480..233518b 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -1,90 +1,33 @@
-use axum::{
- extract::{Json, Path, State},
- http::StatusCode,
-};
+use axum::extract::{Json, Path, State};
use futures::stream::StreamExt;
use crate::{
channel::routes,
- events::app,
+ events::{app, types},
repo::channel,
test::fixtures::{self, future::Immediately as _},
};
#[tokio::test]
-async fn channel_exists() {
- // Set up the environment
-
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
-
- // Call the endpoint
-
- let sent_at = fixtures::now();
- let request = routes::SendRequest {
- message: fixtures::message::propose(),
- };
- let status = routes::on_send(
- State(app.clone()),
- Path(channel.id.clone()),
- sent_at.clone(),
- sender.clone(),
- Json(request.clone()),
- )
- .await
- .expect("sending to a valid channel");
-
- // Verify the structure of the response
-
- assert_eq!(StatusCode::ACCEPTED, status);
-
- // Verify the semantics
-
- let subscribed_at = fixtures::now();
- let mut events = app
- .events()
- .subscribe(&channel.id, &subscribed_at, None)
- .await
- .expect("subscribing to a valid channel");
-
- let event = events
- .next()
- .immediately()
- .await
- .expect("event received by subscribers");
-
- assert_eq!(request.message, event.body);
- assert_eq!(sender, event.sender);
- assert_eq!(*sent_at, event.sent_at);
-}
-
-#[tokio::test]
async fn messages_in_order() {
// Set up the environment
let app = fixtures::scratch_app().await;
let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app).await;
+ let channel = fixtures::channel::create(&app, &fixtures::now()).await;
// Call the endpoint (twice)
let requests = vec![
- (
- fixtures::now(),
- routes::SendRequest {
- message: fixtures::message::propose(),
- },
- ),
- (
- fixtures::now(),
- routes::SendRequest {
- message: fixtures::message::propose(),
- },
- ),
+ (fixtures::now(), fixtures::message::propose()),
+ (fixtures::now(), fixtures::message::propose()),
];
- for (sent_at, request) in &requests {
+ for (sent_at, message) in &requests {
+ let request = routes::SendRequest {
+ message: message.clone(),
+ };
+
routes::on_send(
State(app.clone()),
Path(channel.id.clone()),
@@ -98,20 +41,24 @@ async fn messages_in_order() {
// Verify the semantics
- let subscribed_at = fixtures::now();
let events = app
.events()
- .subscribe(&channel.id, &subscribed_at, None)
+ .subscribe(types::ResumePoint::default())
.await
.expect("subscribing to a valid channel")
+ .filter(fixtures::filter::messages())
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
- for ((sent_at, request), event) in requests.into_iter().zip(events) {
- assert_eq!(request.message, event.body);
- assert_eq!(sender, event.sender);
- assert_eq!(*sent_at, event.sent_at);
+ for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) {
+ assert_eq!(*sent_at, event.at);
+ assert!(matches!(
+ event.data,
+ types::ChannelEventData::Message(event_message)
+ if event_message.sender == sender
+ && event_message.message.body == message
+ ));
}
}