summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorOwen Jacobson <owen@grimoire.ca>2024-10-03 20:17:07 -0400
committerOwen Jacobson <owen@grimoire.ca>2024-10-03 20:17:07 -0400
commit0a5599c60d20ccc2223779eeba5dc91a95ea0fe5 (patch)
treef7ce69ad18768ff53d8fa37d8eb9c6c575633f9e /src
parentec804134c33aedb001c426c5f42f43f53c47848f (diff)
Add endpoints for deleting channels and messages.
It is deliberate that the expire() functions do not use them. To avoid races, the transactions must be committed before events get sent, in both cases, which makes them structurally pretty different.
Diffstat (limited to 'src')
-rw-r--r--src/channel/app.rs72
-rw-r--r--src/channel/routes.rs61
-rw-r--r--src/channel/routes/test/on_send.rs6
-rw-r--r--src/cli.rs13
-rw-r--r--src/event/app.rs1
-rw-r--r--src/event/broadcaster.rs2
-rw-r--r--src/message/app.rs61
-rw-r--r--src/message/mod.rs3
-rw-r--r--src/message/repo.rs46
-rw-r--r--src/message/routes.rs46
10 files changed, 233 insertions, 78 deletions
diff --git a/src/channel/app.rs b/src/channel/app.rs
index 6ce826b..24be2ff 100644
--- a/src/channel/app.rs
+++ b/src/channel/app.rs
@@ -2,10 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
+use super::{repo::Provider as _, Channel, Id};
use crate::{
- channel::{repo::Provider as _, Channel},
clock::DateTime,
- event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
+ db::NotFound,
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence, Sequenced},
+ message::repo::Provider as _,
};
pub struct Channels<'a> {
@@ -28,9 +30,8 @@ impl<'a> Channels<'a> {
.map_err(|err| CreateError::from_duplicate_name(err, name))?;
tx.commit().await?;
- for event in channel.events() {
- self.events.broadcast(event);
- }
+ self.events
+ .broadcast(channel.events().map(Event::from).collect::<Vec<_>>());
Ok(channel.snapshot())
}
@@ -53,6 +54,46 @@ impl<'a> Channels<'a> {
Ok(channels)
}
+ pub async fn delete(&self, channel: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ let mut tx = self.db.begin().await?;
+
+ let channel = tx
+ .channels()
+ .by_id(channel)
+ .await
+ .not_found(|| DeleteError::NotFound(channel.clone()))?
+ .snapshot();
+
+ let mut events = Vec::new();
+
+ let messages = tx.messages().in_channel(&channel).await?;
+ for message in messages {
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
+ events.extend(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+ }
+
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let channel = tx.channels().delete(&channel.id, &deleted).await?;
+ events.extend(
+ channel
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from),
+ );
+
+ tx.commit().await?;
+
+ self.events.broadcast(events);
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
@@ -73,12 +114,13 @@ impl<'a> Channels<'a> {
tx.commit().await?;
- for event in events
- .into_iter()
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
- {
- self.events.broadcast(event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(|a, b| a.sequence() < b.sequence())
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
@@ -92,6 +134,14 @@ pub enum CreateError {
DatabaseError(#[from] sqlx::Error),
}
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("channel {0} not found")]
+ NotFound(Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
impl CreateError {
fn from_duplicate_name(error: sqlx::Error, name: &str) -> Self {
if let Some(error) = error.as_database_error() {
diff --git a/src/channel/routes.rs b/src/channel/routes.rs
index 5bb1ee9..bce634e 100644
--- a/src/channel/routes.rs
+++ b/src/channel/routes.rs
@@ -2,20 +2,18 @@ use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::{IntoResponse, Response},
- routing::{get, post},
+ routing::{delete, get, post},
Router,
};
use axum_extra::extract::Query;
-use super::app;
+use super::{
+ app::{self, DeleteError},
+ Channel, Id,
+};
use crate::{
- app::App,
- channel::{self, Channel},
- clock::RequestedAt,
- error::Internal,
- event::Sequence,
- login::Login,
- message::app::Error as MessageError,
+ app::App, clock::RequestedAt, error::Internal, event::Sequence, login::Login,
+ message::app::SendError,
};
#[cfg(test)]
@@ -26,6 +24,7 @@ pub fn router() -> Router<App> {
.route("/api/channels", get(list))
.route("/api/channels", post(on_create))
.route("/api/channels/:channel", post(on_send))
+ .route("/api/channels/:channel", delete(on_delete))
}
#[derive(Default, serde::Deserialize)]
@@ -95,28 +94,54 @@ struct SendRequest {
async fn on_send(
State(app): State<App>,
- Path(channel): Path<channel::Id>,
+ Path(channel): Path<Id>,
RequestedAt(sent_at): RequestedAt,
login: Login,
Json(request): Json<SendRequest>,
-) -> Result<StatusCode, ErrorResponse> {
+) -> Result<StatusCode, SendErrorResponse> {
app.messages()
.send(&channel, &login, &sent_at, &request.message)
- .await
- // Could impl `From` here, but it's more code and this is used once.
- .map_err(ErrorResponse)?;
+ .await?;
Ok(StatusCode::ACCEPTED)
}
-#[derive(Debug)]
-struct ErrorResponse(MessageError);
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+struct SendErrorResponse(#[from] SendError);
+
+impl IntoResponse for SendErrorResponse {
+ fn into_response(self) -> Response {
+ let Self(error) = self;
+ match error {
+ not_found @ SendError::ChannelNotFound(_) => {
+ (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
+ }
+ other => Internal::from(other).into_response(),
+ }
+ }
+}
+
+async fn on_delete(
+ State(app): State<App>,
+ Path(channel): Path<Id>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Login,
+) -> Result<StatusCode, DeleteErrorResponse> {
+ app.channels().delete(&channel, &deleted_at).await?;
+
+ Ok(StatusCode::ACCEPTED)
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+struct DeleteErrorResponse(#[from] DeleteError);
-impl IntoResponse for ErrorResponse {
+impl IntoResponse for DeleteErrorResponse {
fn into_response(self) -> Response {
let Self(error) = self;
match error {
- not_found @ MessageError::ChannelNotFound(_) => {
+ not_found @ DeleteError::NotFound(_) => {
(StatusCode::NOT_FOUND, not_found.to_string()).into_response()
}
other => Internal::from(other).into_response(),
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 1027b29..3297093 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -5,7 +5,7 @@ use crate::{
channel,
channel::routes,
event,
- message::app,
+ message::app::SendError,
test::fixtures::{self, future::Immediately as _},
};
@@ -77,7 +77,7 @@ async fn nonexistent_channel() {
let request = routes::SendRequest {
message: fixtures::message::propose(),
};
- let routes::ErrorResponse(error) = routes::on_send(
+ let routes::SendErrorResponse(error) = routes::on_send(
State(app),
Path(channel.clone()),
sent_at,
@@ -91,6 +91,6 @@ async fn nonexistent_channel() {
assert!(matches!(
error,
- app::Error::ChannelNotFound(error_channel) if channel == error_channel
+ SendError::ChannelNotFound(error_channel) if channel == error_channel
));
}
diff --git a/src/cli.rs b/src/cli.rs
index 893fae2..2d9f512 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -10,7 +10,7 @@ use clap::Parser;
use sqlx::sqlite::SqlitePool;
use tokio::net;
-use crate::{app::App, channel, clock, db, event, expire, login};
+use crate::{app::App, channel, clock, db, event, expire, login, message};
/// Command-line entry point for running the `hi` server.
///
@@ -105,9 +105,14 @@ impl Args {
}
fn routers() -> Router<App> {
- [channel::router(), event::router(), login::router()]
- .into_iter()
- .fold(Router::default(), Router::merge)
+ [
+ channel::router(),
+ event::router(),
+ login::router(),
+ message::router(),
+ ]
+ .into_iter()
+ .fold(Router::default(), Router::merge)
}
fn started_msg(listener: &net::TcpListener) -> io::Result<String> {
diff --git a/src/event/app.rs b/src/event/app.rs
index e58bea9..32f0a97 100644
--- a/src/event/app.rs
+++ b/src/event/app.rs
@@ -61,6 +61,7 @@ impl<'a> Events<'a> {
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// `replay_events`.
+ .flat_map(stream::iter)
.filter(Self::resume(resume_live_at));
Ok(replay.chain(live_messages))
diff --git a/src/event/broadcaster.rs b/src/event/broadcaster.rs
index de2513a..3c4efac 100644
--- a/src/event/broadcaster.rs
+++ b/src/event/broadcaster.rs
@@ -1,3 +1,3 @@
use crate::broadcast;
-pub type Broadcaster = broadcast::Broadcaster<super::Event>;
+pub type Broadcaster = broadcast::Broadcaster<Vec<super::Event>>;
diff --git a/src/message/app.rs b/src/message/app.rs
index 51f772e..1d34c14 100644
--- a/src/message/app.rs
+++ b/src/message/app.rs
@@ -2,12 +2,12 @@ use chrono::TimeDelta;
use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
-use super::{repo::Provider as _, Message};
+use super::{repo::Provider as _, Id, Message};
use crate::{
channel::{self, repo::Provider as _},
clock::DateTime,
db::NotFound as _,
- event::{broadcaster::Broadcaster, repo::Provider as _, Sequence},
+ event::{broadcaster::Broadcaster, repo::Provider as _, Event, Sequence},
login::Login,
};
@@ -27,13 +27,13 @@ impl<'a> Messages<'a> {
sender: &Login,
sent_at: &DateTime,
body: &str,
- ) -> Result<Message, Error> {
+ ) -> Result<Message, SendError> {
let mut tx = self.db.begin().await?;
let channel = tx
.channels()
.by_id(channel)
.await
- .not_found(|| Error::ChannelNotFound(channel.clone()))?;
+ .not_found(|| SendError::ChannelNotFound(channel.clone()))?;
let sent = tx.sequence().next(sent_at).await?;
let message = tx
.messages()
@@ -41,24 +41,40 @@ impl<'a> Messages<'a> {
.await?;
tx.commit().await?;
- for event in message.events() {
- self.events.broadcast(event);
- }
+ self.events
+ .broadcast(message.events().map(Event::from).collect::<Vec<_>>());
Ok(message.snapshot())
}
+ pub async fn delete(&self, message: &Id, deleted_at: &DateTime) -> Result<(), DeleteError> {
+ let mut tx = self.db.begin().await?;
+ let deleted = tx.sequence().next(deleted_at).await?;
+ let message = tx.messages().delete(message, &deleted).await?;
+ tx.commit().await?;
+
+ self.events.broadcast(
+ message
+ .events()
+ .filter(Sequence::start_from(deleted.sequence))
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
+
+ Ok(())
+ }
+
pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
// Somewhat arbitrarily, expire after 90 days.
let expire_at = relative_to.to_owned() - TimeDelta::days(90);
let mut tx = self.db.begin().await?;
- let expired = tx.messages().expired(&expire_at).await?;
+ let expired = tx.messages().expired(&expire_at).await?;
let mut events = Vec::with_capacity(expired.len());
- for (channel, message) in expired {
+ for message in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let message = tx.messages().delete(&channel, &message, &deleted).await?;
+ let message = tx.messages().delete(&message, &deleted).await?;
events.push(
message
.events()
@@ -68,21 +84,32 @@ impl<'a> Messages<'a> {
tx.commit().await?;
- for event in events
- .into_iter()
- .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
- {
- self.events.broadcast(event);
- }
+ self.events.broadcast(
+ events
+ .into_iter()
+ .kmerge_by(|a, b| a.instant.sequence < b.instant.sequence)
+ .map(Event::from)
+ .collect::<Vec<_>>(),
+ );
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub enum SendError {
+ #[error("channel {0} not found")]
+ ChannelNotFound(channel::Id),
+ #[error(transparent)]
+ DatabaseError(#[from] sqlx::Error),
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
#[error("channel {0} not found")]
ChannelNotFound(channel::Id),
+ #[error("message {0} not found")]
+ NotFound(Id),
#[error(transparent)]
DatabaseError(#[from] sqlx::Error),
}
diff --git a/src/message/mod.rs b/src/message/mod.rs
index 52d56c1..a8f51ab 100644
--- a/src/message/mod.rs
+++ b/src/message/mod.rs
@@ -3,6 +3,7 @@ pub mod event;
mod history;
mod id;
pub mod repo;
+mod routes;
mod snapshot;
-pub use self::{event::Event, history::History, id::Id, snapshot::Message};
+pub use self::{event::Event, history::History, id::Id, routes::router, snapshot::Message};
diff --git a/src/message/repo.rs b/src/message/repo.rs
index 3b2b8f7..ae41736 100644
--- a/src/message/repo.rs
+++ b/src/message/repo.rs
@@ -62,7 +62,25 @@ impl<'c> Messages<'c> {
Ok(message)
}
- async fn by_id(&mut self, channel: &Channel, message: &Id) -> Result<History, sqlx::Error> {
+ pub async fn in_channel(&mut self, channel: &Channel) -> Result<Vec<Id>, sqlx::Error> {
+ let messages = sqlx::query_scalar!(
+ r#"
+ select
+ message.id as "id: Id"
+ from message
+ join channel on message.channel = channel.id
+ where channel.id = $1
+ order by message.sent_sequence
+ "#,
+ channel.id,
+ )
+ .fetch_all(&mut *self.0)
+ .await?;
+
+ Ok(messages)
+ }
+
+ async fn by_id(&mut self, message: &Id) -> Result<History, sqlx::Error> {
let message = sqlx::query!(
r#"
select
@@ -78,10 +96,8 @@ impl<'c> Messages<'c> {
join channel on message.channel = channel.id
join login as sender on message.sender = sender.id
where message.id = $1
- and message.channel = $2
"#,
message,
- channel.id,
)
.map(|row| History {
message: Message {
@@ -110,11 +126,10 @@ impl<'c> Messages<'c> {
pub async fn delete(
&mut self,
- channel: &Channel,
message: &Id,
deleted: &Instant,
) -> Result<History, sqlx::Error> {
- let history = self.by_id(channel, message).await?;
+ let history = self.by_id(message).await?;
sqlx::query_scalar!(
r#"
@@ -134,31 +149,16 @@ impl<'c> Messages<'c> {
})
}
- pub async fn expired(
- &mut self,
- expire_at: &DateTime,
- ) -> Result<Vec<(Channel, Id)>, sqlx::Error> {
- let messages = sqlx::query!(
+ pub async fn expired(&mut self, expire_at: &DateTime) -> Result<Vec<Id>, sqlx::Error> {
+ let messages = sqlx::query_scalar!(
r#"
select
- channel.id as "channel_id: channel::Id",
- channel.name as "channel_name",
- message.id as "message: Id"
+ id as "message: Id"
from message
- join channel on message.channel = channel.id
where sent_at < $1
"#,
expire_at,
)
- .map(|row| {
- (
- Channel {
- id: row.channel_id,
- name: row.channel_name,
- },
- row.message,
- )
- })
.fetch_all(&mut *self.0)
.await?;
diff --git a/src/message/routes.rs b/src/message/routes.rs
new file mode 100644
index 0000000..29fe3d7
--- /dev/null
+++ b/src/message/routes.rs
@@ -0,0 +1,46 @@
+use axum::{
+ extract::{Path, State},
+ http::StatusCode,
+ response::{IntoResponse, Response},
+ routing::delete,
+ Router,
+};
+
+use crate::{
+ app::App,
+ clock::RequestedAt,
+ error::Internal,
+ login::Login,
+ message::{self, app::DeleteError},
+};
+
+pub fn router() -> Router<App> {
+ Router::new().route("/api/messages/:message", delete(on_delete))
+}
+
+async fn on_delete(
+ State(app): State<App>,
+ Path(message): Path<message::Id>,
+ RequestedAt(deleted_at): RequestedAt,
+ _: Login,
+) -> Result<StatusCode, ErrorResponse> {
+ app.messages().delete(&message, &deleted_at).await?;
+
+ Ok(StatusCode::ACCEPTED)
+}
+
+#[derive(Debug, thiserror::Error)]
+#[error(transparent)]
+struct ErrorResponse(#[from] DeleteError);
+
+impl IntoResponse for ErrorResponse {
+ fn into_response(self) -> Response {
+ let Self(error) = self;
+ match error {
+ not_found @ (DeleteError::ChannelNotFound(_) | DeleteError::NotFound(_)) => {
+ (StatusCode::NOT_FOUND, not_found.to_string()).into_response()
+ }
+ other => Internal::from(other).into_response(),
+ }
+ }
+}