diff options
| author | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
|---|---|---|
| committer | Kit La Touche <kit@transneptune.net> | 2024-09-28 21:55:50 -0400 |
| commit | 897eef0306917baf3662e691b29f182d35805296 (patch) | |
| tree | 024e2a3fa13ac96e0b4339a6d62ae533efe7db07 /src/channel | |
| parent | c524b333befc8cc97aa49f73b3ed28bc3b82420c (diff) | |
| parent | 4d0bb0709b168a24ab6a8dbc86da45d7503596ee (diff) | |
Merge branch 'main' into feature-frontend
Diffstat (limited to 'src/channel')
| -rw-r--r-- | src/channel/app.rs | 38 | ||||
| -rw-r--r-- | src/channel/routes.rs | 3 | ||||
| -rw-r--r-- | src/channel/routes/test/list.rs | 6 | ||||
| -rw-r--r-- | src/channel/routes/test/on_create.rs | 51 | ||||
| -rw-r--r-- | src/channel/routes/test/on_send.rs | 93 |
5 files changed, 99 insertions, 92 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs index 793fa35..d7312e4 100644 --- a/src/channel/app.rs +++ b/src/channel/app.rs @@ -1,7 +1,9 @@ +use chrono::TimeDelta; use sqlx::sqlite::SqlitePool; use crate::{ - events::broadcaster::Broadcaster, + clock::DateTime, + events::{broadcaster::Broadcaster, repo::message::Provider as _, types::ChannelEvent}, repo::channel::{Channel, Provider as _}, }; @@ -15,16 +17,18 @@ impl<'a> Channels<'a> { Self { db, broadcaster } } - pub async fn create(&self, name: &str) -> Result<Channel, CreateError> { + pub async fn create(&self, name: &str, created_at: &DateTime) -> Result<Channel, CreateError> { let mut tx = self.db.begin().await?; let channel = tx .channels() - .create(name) + .create(name, created_at) .await .map_err(|err| CreateError::from_duplicate_name(err, name))?; - self.broadcaster.register_channel(&channel.id); tx.commit().await?; + self.broadcaster + .broadcast(&ChannelEvent::created(channel.clone())); + Ok(channel) } @@ -35,6 +39,32 @@ impl<'a> Channels<'a> { Ok(channels) } + + pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> { + // Somewhat arbitrarily, expire after 90 days. + let expire_at = relative_to.to_owned() - TimeDelta::days(90); + + let mut tx = self.db.begin().await?; + let expired = tx.channels().expired(&expire_at).await?; + + let mut events = Vec::with_capacity(expired.len()); + for channel in expired { + let sequence = tx.message_events().assign_sequence(&channel).await?; + let event = tx + .channels() + .delete_expired(&channel, sequence, relative_to) + .await?; + events.push(event); + } + + tx.commit().await?; + + for event in events { + self.broadcaster.broadcast(&event); + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/src/channel/routes.rs b/src/channel/routes.rs index f524e62..1f8db5a 100644 --- a/src/channel/routes.rs +++ b/src/channel/routes.rs @@ -52,11 +52,12 @@ struct CreateRequest { async fn on_create( State(app): State<App>, _: Login, // requires auth, but doesn't actually care who you are + RequestedAt(created_at): RequestedAt, Json(form): Json<CreateRequest>, ) -> Result<Json<Channel>, CreateError> { let channel = app .channels() - .create(&form.name) + .create(&form.name, &created_at) .await .map_err(CreateError)?; diff --git a/src/channel/routes/test/list.rs b/src/channel/routes/test/list.rs index f7f7b44..bc94024 100644 --- a/src/channel/routes/test/list.rs +++ b/src/channel/routes/test/list.rs @@ -26,7 +26,7 @@ async fn one_channel() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint @@ -46,8 +46,8 @@ async fn multiple_channels() { let app = fixtures::scratch_app().await; let viewer = fixtures::login::create(&app).await; let channels = vec![ - fixtures::channel::create(&app).await, - fixtures::channel::create(&app).await, + fixtures::channel::create(&app, &fixtures::now()).await, + fixtures::channel::create(&app, &fixtures::now()).await, ]; // Call the endpoint diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs index 23885c0..e2610a5 100644 --- a/src/channel/routes/test/on_create.rs +++ b/src/channel/routes/test/on_create.rs @@ -1,8 +1,10 @@ use axum::extract::{Json, State}; +use futures::stream::StreamExt as _; use crate::{ channel::{app, routes}, - test::fixtures, + events::types, + test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] @@ -16,10 +18,14 @@ async fn new_channel() { let name = fixtures::channel::propose(); let request = routes::CreateRequest { name }; - let Json(response_channel) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect("new channel in an empty app"); + let Json(response_channel) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect("new channel in an empty app"); // Verify the structure of the response @@ -28,8 +34,27 @@ async fn new_channel() { // Verify the semantics let channels = app.channels().all().await.expect("always succeeds"); - assert!(channels.contains(&response_channel)); + + let mut events = app + .events() + .subscribe(types::ResumePoint::default()) + .await + .expect("subscribing never fails") + .filter(fixtures::filter::created()); + + let types::ResumableEvent(_, event) = events + .next() + .immediately() + .await + .expect("creation event published"); + + assert_eq!(types::Sequence::default(), event.sequence); + assert!(matches!( + event.data, + types::ChannelEventData::Created(event) + if event.channel == response_channel + )); } #[tokio::test] @@ -38,15 +63,19 @@ async fn duplicate_name() { let app = fixtures::scratch_app().await; let creator = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint let request = routes::CreateRequest { name: channel.name }; - let routes::CreateError(error) = - routes::on_create(State(app.clone()), creator, Json(request.clone())) - .await - .expect_err("duplicate channel name"); + let routes::CreateError(error) = routes::on_create( + State(app.clone()), + creator, + fixtures::now(), + Json(request.clone()), + ) + .await + .expect_err("duplicate channel name"); // Verify the structure of the response diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs index 93a5480..233518b 100644 --- a/src/channel/routes/test/on_send.rs +++ b/src/channel/routes/test/on_send.rs @@ -1,90 +1,33 @@ -use axum::{ - extract::{Json, Path, State}, - http::StatusCode, -}; +use axum::extract::{Json, Path, State}; use futures::stream::StreamExt; use crate::{ channel::routes, - events::app, + events::{app, types}, repo::channel, test::fixtures::{self, future::Immediately as _}, }; #[tokio::test] -async fn channel_exists() { - // Set up the environment - - let app = fixtures::scratch_app().await; - let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; - - // Call the endpoint - - let sent_at = fixtures::now(); - let request = routes::SendRequest { - message: fixtures::message::propose(), - }; - let status = routes::on_send( - State(app.clone()), - Path(channel.id.clone()), - sent_at.clone(), - sender.clone(), - Json(request.clone()), - ) - .await - .expect("sending to a valid channel"); - - // Verify the structure of the response - - assert_eq!(StatusCode::ACCEPTED, status); - - // Verify the semantics - - let subscribed_at = fixtures::now(); - let mut events = app - .events() - .subscribe(&channel.id, &subscribed_at, None) - .await - .expect("subscribing to a valid channel"); - - let event = events - .next() - .immediately() - .await - .expect("event received by subscribers"); - - assert_eq!(request.message, event.body); - assert_eq!(sender, event.sender); - assert_eq!(*sent_at, event.sent_at); -} - -#[tokio::test] async fn messages_in_order() { // Set up the environment let app = fixtures::scratch_app().await; let sender = fixtures::login::create(&app).await; - let channel = fixtures::channel::create(&app).await; + let channel = fixtures::channel::create(&app, &fixtures::now()).await; // Call the endpoint (twice) let requests = vec![ - ( - fixtures::now(), - routes::SendRequest { - message: fixtures::message::propose(), - }, - ), - ( - fixtures::now(), - routes::SendRequest { - message: fixtures::message::propose(), - }, - ), + (fixtures::now(), fixtures::message::propose()), + (fixtures::now(), fixtures::message::propose()), ]; - for (sent_at, request) in &requests { + for (sent_at, message) in &requests { + let request = routes::SendRequest { + message: message.clone(), + }; + routes::on_send( State(app.clone()), Path(channel.id.clone()), @@ -98,20 +41,24 @@ async fn messages_in_order() { // Verify the semantics - let subscribed_at = fixtures::now(); let events = app .events() - .subscribe(&channel.id, &subscribed_at, None) + .subscribe(types::ResumePoint::default()) .await .expect("subscribing to a valid channel") + .filter(fixtures::filter::messages()) .take(requests.len()); let events = events.collect::<Vec<_>>().immediately().await; - for ((sent_at, request), event) in requests.into_iter().zip(events) { - assert_eq!(request.message, event.body); - assert_eq!(sender, event.sender); - assert_eq!(*sent_at, event.sent_at); + for ((sent_at, message), types::ResumableEvent(_, event)) in requests.into_iter().zip(events) { + assert_eq!(*sent_at, event.at); + assert!(matches!( + event.data, + types::ChannelEventData::Message(event_message) + if event_message.sender == sender + && event_message.message.body == message + )); } } |
