summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json12
-rw-r--r--.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json (renamed from .sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json)6
-rw-r--r--.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json20
-rw-r--r--src/conversation/app.rs69
-rw-r--r--src/conversation/history.rs40
-rw-r--r--src/conversation/repo.rs138
6 files changed, 169 insertions, 116 deletions
diff --git a/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json
new file mode 100644
index 0000000..46bfea0
--- /dev/null
+++ b/.sqlx/query-17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3.json
@@ -0,0 +1,12 @@
+{
+ "db_name": "SQLite",
+ "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n ",
+ "describe": {
+ "columns": [],
+ "parameters": {
+ "Right": 2
+ },
+ "nullable": []
+ },
+ "hash": "17c3748391b152beb85ce3cf58b1689d916fd1c209645c74e17a8da2102eada3"
+}
diff --git a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json
index 9c62ca9..3d33188 100644
--- a/.sqlx/query-1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741.json
+++ b/.sqlx/query-32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f.json
@@ -1,12 +1,12 @@
{
"db_name": "SQLite",
- "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $4)\n ",
+ "query": "\n insert into conversation (id, created_at, created_sequence, last_sequence)\n values ($1, $2, $3, $3)\n ",
"describe": {
"columns": [],
"parameters": {
- "Right": 4
+ "Right": 3
},
"nullable": []
},
- "hash": "1f0f35655dd57532897aaba9bde38547e626387dfe5b859f02ae1dbe171d5741"
+ "hash": "32f50277d687903773db053379a428eb461067b1769b8de60a73e2117a3cd11f"
}
diff --git a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json b/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json
deleted file mode 100644
index 9d212fa..0000000
--- a/.sqlx/query-85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n update conversation\n set last_sequence = max(last_sequence, $1)\n where id = $2\n returning id as \"id: Id\"\n ",
- "describe": {
- "columns": [
- {
- "name": "id: Id",
- "ordinal": 0,
- "type_info": "Text"
- }
- ],
- "parameters": {
- "Right": 2
- },
- "nullable": [
- false
- ]
- },
- "hash": "85145c8b8264e7d01eef66e22353037f33fd3a3eaec0e36f06ffbbffb625aa24"
-}
diff --git a/src/conversation/app.rs b/src/conversation/app.rs
index 81ccdcf..30baf77 100644
--- a/src/conversation/app.rs
+++ b/src/conversation/app.rs
@@ -3,7 +3,7 @@ use itertools::Itertools;
use sqlx::sqlite::SqlitePool;
use super::{
- Conversation, Id,
+ Conversation, History, Id, history,
repo::{LoadError, Provider as _},
validate,
};
@@ -11,7 +11,7 @@ use crate::{
clock::DateTime,
db::{Duplicate as _, NotFound as _},
event::{Broadcaster, Event, Sequence, repo::Provider as _},
- message::{self, repo::Provider as _},
+ message::repo::Provider as _,
name::{self, Name},
};
@@ -36,15 +36,21 @@ impl<'a> Conversations<'a> {
let mut tx = self.db.begin().await?;
let created = tx.sequence().next(created_at).await?;
- let conversation = tx
- .conversations()
- .create(name, &created)
+ let conversation = History::begin(name, created);
+
+ // This filter technically includes every event in the history, but it's easier to follow if
+ // the various event-manipulating app methods are consistent, and it's harmless to have an
+ // always-satisfied filter.
+ let events = conversation.events().filter(Sequence::start_from(created));
+ tx.conversations()
+ .record_events(events.clone())
.await
.duplicate(|| CreateError::DuplicateName(name.clone()))?;
+
tx.commit().await?;
self.events
- .broadcast(conversation.events().map(Event::from).collect::<Vec<_>>());
+ .broadcast(events.map(Event::from).collect::<Vec<_>>());
Ok(conversation.as_created())
}
@@ -78,11 +84,6 @@ impl<'a> Conversations<'a> {
.by_id(conversation)
.await
.not_found(|| DeleteError::NotFound(conversation.clone()))?;
- conversation
- .as_snapshot()
- .ok_or_else(|| DeleteError::Deleted(conversation.id().clone()))?;
-
- let mut events = Vec::new();
let messages = tx.messages().live(&conversation).await?;
let has_messages = messages
@@ -94,17 +95,15 @@ impl<'a> Conversations<'a> {
}
let deleted = tx.sequence().next(deleted_at).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.extend(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence))
- .map(Event::from),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations().record_events(events.clone()).await?;
tx.commit().await?;
- self.events.broadcast(events);
+ self.events
+ .broadcast(events.map(Event::from).collect::<Vec<_>>());
Ok(())
}
@@ -120,12 +119,14 @@ impl<'a> Conversations<'a> {
let mut events = Vec::with_capacity(expired.len());
for conversation in expired {
let deleted = tx.sequence().next(relative_to).await?;
- let conversation = tx.conversations().delete(&conversation, &deleted).await?;
- events.push(
- conversation
- .events()
- .filter(Sequence::start_from(deleted.sequence)),
- );
+ let conversation = conversation.delete(deleted)?;
+
+ let conversation_events = conversation.events().filter(Sequence::start_from(deleted));
+ tx.conversations()
+ .record_events(conversation_events.clone())
+ .await?;
+
+ events.push(conversation_events);
}
tx.commit().await?;
@@ -218,8 +219,18 @@ impl From<LoadError> for DeleteError {
}
}
+impl From<history::DeleteError> for DeleteError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum ExpireError {
+ #[error("tried to expire already-deleted conversation: {0}")]
+ Deleted(Id),
#[error(transparent)]
Database(#[from] sqlx::Error),
#[error(transparent)]
@@ -234,3 +245,11 @@ impl From<LoadError> for ExpireError {
}
}
}
+
+impl From<history::DeleteError> for ExpireError {
+ fn from(error: history::DeleteError) -> Self {
+ match error {
+ history::DeleteError::Deleted(conversation) => Self::Deleted(conversation.id().clone()),
+ }
+ }
+}
diff --git a/src/conversation/history.rs b/src/conversation/history.rs
index 8821277..5cba9ca 100644
--- a/src/conversation/history.rs
+++ b/src/conversation/history.rs
@@ -4,13 +4,49 @@ use super::{
Conversation, Id,
event::{Created, Deleted, Event},
};
-use crate::event::Sequence;
+use crate::{
+ event::{Instant, Sequence},
+ name::Name,
+};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct History {
pub conversation: Conversation,
}
+// Lifecycle interface
+impl History {
+ pub fn begin(name: &Name, created: Instant) -> Self {
+ Self {
+ conversation: Conversation {
+ id: Id::generate(),
+ name: name.clone(),
+ created,
+ deleted: None,
+ },
+ }
+ }
+
+ pub fn delete(self, deleted: Instant) -> Result<Self, DeleteError> {
+ if self.conversation.deleted.is_none() {
+ Ok(Self {
+ conversation: Conversation {
+ deleted: Some(deleted),
+ ..self.conversation
+ },
+ })
+ } else {
+ Err(DeleteError::Deleted(self))
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum DeleteError {
+ #[error("conversation {} already deleted", .0.conversation.id)]
+ Deleted(History),
+}
+
// State interface
impl History {
pub fn id(&self) -> &Id {
@@ -41,7 +77,7 @@ impl History {
// Event factories
impl History {
- pub fn events(&self) -> impl Iterator<Item = Event> + use<> {
+ pub fn events(&self) -> impl Iterator<Item = Event> + Clone + use<> {
[self.created()]
.into_iter()
.merge_by(self.deleted(), Sequence::merge)
diff --git a/src/conversation/repo.rs b/src/conversation/repo.rs
index 7e38b62..cb66bf8 100644
--- a/src/conversation/repo.rs
+++ b/src/conversation/repo.rs
@@ -1,9 +1,12 @@
use futures::stream::{StreamExt as _, TryStreamExt as _};
use sqlx::{SqliteConnection, Transaction, sqlite::Sqlite};
+use super::{
+ Conversation, Event, History, Id,
+ event::{Created, Deleted},
+};
use crate::{
clock::DateTime,
- conversation::{Conversation, History, Id},
db::NotFound,
event::{Instant, Sequence},
name::{self, Name},
@@ -22,22 +25,41 @@ impl Provider for Transaction<'_, Sqlite> {
pub struct Conversations<'t>(&'t mut SqliteConnection);
impl Conversations<'_> {
- pub async fn create(&mut self, name: &Name, created: &Instant) -> Result<History, sqlx::Error> {
- let id = Id::generate();
- let name = name.clone();
+ pub async fn record_events(
+ &mut self,
+ events: impl IntoIterator<Item = Event>,
+ ) -> Result<(), sqlx::Error> {
+ for event in events {
+ self.record_event(&event).await?;
+ }
+ Ok(())
+ }
+
+ pub async fn record_event(&mut self, event: &Event) -> Result<(), sqlx::Error> {
+ match event {
+ Event::Created(created) => self.record_created(created).await,
+ Event::Deleted(deleted) => self.record_deleted(deleted).await,
+ }
+ }
+
+ async fn record_created(&mut self, created: &Created) -> Result<(), sqlx::Error> {
+ let Conversation {
+ id,
+ created,
+ name,
+ deleted: _,
+ } = &created.conversation;
let display_name = name.display();
let canonical_name = name.canonical();
- let created = *created;
sqlx::query!(
r#"
insert into conversation (id, created_at, created_sequence, last_sequence)
- values ($1, $2, $3, $4)
+ values ($1, $2, $3, $3)
"#,
id,
created.at,
created.sequence,
- created.sequence,
)
.execute(&mut *self.0)
.await?;
@@ -54,16 +76,50 @@ impl Conversations<'_> {
.execute(&mut *self.0)
.await?;
- let conversation = History {
- conversation: Conversation {
- created,
- id,
- name: name.clone(),
- deleted: None,
- },
- };
+ Ok(())
+ }
- Ok(conversation)
+ async fn record_deleted(&mut self, deleted: &Deleted) -> Result<(), sqlx::Error> {
+ let Deleted { instant, id } = deleted;
+ sqlx::query!(
+ r#"
+ update conversation
+ set last_sequence = max(last_sequence, $1)
+ where id = $2
+ "#,
+ instant.sequence,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ sqlx::query!(
+ r#"
+ insert into conversation_deleted (id, deleted_at, deleted_sequence)
+ values ($1, $2, $3)
+ "#,
+ id,
+ instant.at,
+ instant.sequence,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ // Small social responsibility hack here: when a conversation is deleted, its
+ // name is retconned to have been the empty string. Someone reading the event
+ // stream afterwards, or looking at conversations via the API, cannot retrieve
+ // the "deleted" conversation's information by ignoring the deletion event.
+ sqlx::query!(
+ r#"
+ delete from conversation_name
+ where id = $1
+ "#,
+ id,
+ )
+ .execute(&mut *self.0)
+ .await?;
+
+ Ok(())
}
pub async fn by_id(&mut self, conversation: &Id) -> Result<History, LoadError> {
@@ -179,56 +235,6 @@ impl Conversations<'_> {
Ok(conversations)
}
- pub async fn delete(
- &mut self,
- conversation: &History,
- deleted: &Instant,
- ) -> Result<History, LoadError> {
- let id = conversation.id();
- sqlx::query!(
- r#"
- update conversation
- set last_sequence = max(last_sequence, $1)
- where id = $2
- returning id as "id: Id"
- "#,
- deleted.sequence,
- id,
- )
- .fetch_one(&mut *self.0)
- .await?;
-
- sqlx::query!(
- r#"
- insert into conversation_deleted (id, deleted_at, deleted_sequence)
- values ($1, $2, $3)
- "#,
- id,
- deleted.at,
- deleted.sequence,
- )
- .execute(&mut *self.0)
- .await?;
-
- // Small social responsibility hack here: when a conversation is deleted, its
- // name is retconned to have been the empty string. Someone reading the event
- // stream afterwards, or looking at conversations via the API, cannot retrieve
- // the "deleted" conversation's information by ignoring the deletion event.
- sqlx::query!(
- r#"
- delete from conversation_name
- where id = $1
- "#,
- id,
- )
- .execute(&mut *self.0)
- .await?;
-
- let conversation = self.by_id(id).await?;
-
- Ok(conversation)
- }
-
pub async fn purge(&mut self, purge_at: &DateTime) -> Result<(), sqlx::Error> {
let conversations = sqlx::query_scalar!(
r#"