summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json12
-rw-r--r--.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json (renamed from .sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json)6
-rw-r--r--.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json38
-rw-r--r--.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json20
-rw-r--r--src/channel/routes/test/on_create.rs2
-rw-r--r--src/channel/routes/test/on_send.rs4
-rw-r--r--src/cli.rs4
-rw-r--r--src/events/app.rs40
-rw-r--r--src/events/expire.rs18
-rw-r--r--src/events/mod.rs1
-rw-r--r--src/events/repo/message.rs70
-rw-r--r--src/events/routes.rs5
-rw-r--r--src/events/routes/test.rs91
-rw-r--r--src/events/types.rs12
-rw-r--r--src/repo/channel.rs8
-rw-r--r--src/test/fixtures/filter.rs9
-rw-r--r--src/test/fixtures/mod.rs1
17 files changed, 229 insertions, 112 deletions
diff --git a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json b/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json
deleted file mode 100644
index 9edc1af..0000000
--- a/.sqlx/query-61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "db_name": "SQLite",
- "query": "\n delete from message\n where sent_at < $1\n ",
- "describe": {
- "columns": [],
- "parameters": {
- "Right": 1
- },
- "nullable": []
- },
- "hash": "61ebab676719aed700404185eb71f113b18d581b50d2c4c7d0462438a71e5f1d"
-}
diff --git a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json
index 64d56dd..5c27826 100644
--- a/.sqlx/query-bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33.json
+++ b/.sqlx/query-aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf.json
@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
- "query": "\n insert\n into channel (id, name, created_at)\n values ($1, $2, $3)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ",
+ "query": "\n insert\n into channel (id, name, created_at, last_sequence)\n values ($1, $2, $3, $4)\n returning\n id as \"id: Id\",\n name,\n created_at as \"created_at: DateTime\"\n ",
"describe": {
"columns": [
{
@@ -20,7 +20,7 @@
}
],
"parameters": {
- "Right": 3
+ "Right": 4
},
"nullable": [
false,
@@ -28,5 +28,5 @@
false
]
},
- "hash": "bddcc014038cc8611bffcbb3ef57ab9a8bc90dd1104e9900062dccc708e9bd33"
+ "hash": "aeafe536f36593bfd1080ee61c4b10c6f90b1221e963db69c8e6d23e99012ecf"
}
diff --git a/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json
new file mode 100644
index 0000000..87e478e
--- /dev/null
+++ b/.sqlx/query-df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c.json
@@ -0,0 +1,38 @@
+{
+ "db_name": "SQLite",
+ "query": "\n select\n channel.id as \"channel_id: channel::Id\",\n channel.name as \"channel_name\",\n channel.created_at as \"channel_created_at: DateTime\",\n message.id as \"message: message::Id\"\n from message\n join channel on message.channel = channel.id\n join login as sender on message.sender = sender.id\n where sent_at < $1\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "channel_id: channel::Id",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "channel_name",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "channel_created_at: DateTime",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "message: message::Id",
+ "ordinal": 3,
+ "type_info": "Text"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ false,
+ false,
+ false,
+ false
+ ]
+ },
+ "hash": "df3656771c3cb6851e0c54a2d368676f279af866d0840d6c2c093b87b1eadd8c"
+}
diff --git a/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json
new file mode 100644
index 0000000..7b1d2d8
--- /dev/null
+++ b/.sqlx/query-f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9.json
@@ -0,0 +1,20 @@
+{
+ "db_name": "SQLite",
+ "query": "\n delete from message\n where id = $1\n returning 1 as \"row: i64\"\n ",
+ "describe": {
+ "columns": [
+ {
+ "name": "row: i64",
+ "ordinal": 0,
+ "type_info": "Null"
+ }
+ ],
+ "parameters": {
+ "Right": 1
+ },
+ "nullable": [
+ null
+ ]
+ },
+ "hash": "f5d5b3ec3554a80230e29676cdd9450fd1e8b4f2425cfda141d72fd94d3c39f9"
+}
diff --git a/src/channel/routes/test/on_create.rs b/src/channel/routes/test/on_create.rs
index bb6697f..5e62d7f 100644
--- a/src/channel/routes/test/on_create.rs
+++ b/src/channel/routes/test/on_create.rs
@@ -38,7 +38,7 @@ async fn new_channel() {
let mut events = app
.events()
- .subscribe(&fixtures::now(), types::ResumePoint::default())
+ .subscribe(types::ResumePoint::default())
.await
.expect("subscribing never fails")
.filter(|types::ResumableEvent(_, event)| future::ready(event.channel == response_channel));
diff --git a/src/channel/routes/test/on_send.rs b/src/channel/routes/test/on_send.rs
index 20ae016..233518b 100644
--- a/src/channel/routes/test/on_send.rs
+++ b/src/channel/routes/test/on_send.rs
@@ -41,12 +41,12 @@ async fn messages_in_order() {
// Verify the semantics
- let subscribed_at = fixtures::now();
let events = app
.events()
- .subscribe(&subscribed_at, types::ResumePoint::default())
+ .subscribe(types::ResumePoint::default())
.await
.expect("subscribing to a valid channel")
+ .filter(fixtures::filter::messages())
.take(requests.len());
let events = events.collect::<Vec<_>>().immediately().await;
diff --git a/src/cli.rs b/src/cli.rs
index a6d752c..472d68f 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -72,6 +72,10 @@ impl Args {
let app = App::from(pool);
let app = routers()
+ .route_layer(middleware::from_fn_with_state(
+ app.clone(),
+ events::expire::middleware,
+ ))
.route_layer(middleware::from_fn(clock::middleware))
.with_state(app);
diff --git a/src/events/app.rs b/src/events/app.rs
index 134e86a..03f3ee6 100644
--- a/src/events/app.rs
+++ b/src/events/app.rs
@@ -55,14 +55,35 @@ impl<'a> Events<'a> {
Ok(event)
}
+ pub async fn expire(&self, relative_to: &DateTime) -> Result<(), sqlx::Error> {
+ // Somewhat arbitrarily, expire after 90 days.
+ let expire_at = relative_to.to_owned() - TimeDelta::days(90);
+
+ let mut tx = self.db.begin().await?;
+ let expired = tx.message_events().expired(&expire_at).await?;
+
+ let mut events = Vec::with_capacity(expired.len());
+ for (channel, message) in expired {
+ let event = tx
+ .message_events()
+ .delete_expired(&channel, &message, relative_to)
+ .await?;
+ events.push(event);
+ }
+
+ tx.commit().await?;
+
+ for event in events {
+ self.broadcaster.broadcast(&event);
+ }
+
+ Ok(())
+ }
+
pub async fn subscribe(
&self,
- subscribed_at: &DateTime,
resume_at: ResumePoint,
) -> Result<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug, sqlx::Error> {
- // Somewhat arbitrarily, expire after 90 days.
- let expire_at = subscribed_at.to_owned() - TimeDelta::days(90);
-
let mut tx = self.db.begin().await?;
let channels = tx.channels().all().await?;
@@ -81,8 +102,6 @@ impl<'a> Events<'a> {
// querying the DB. We'll prune out duplicates later.
let live_messages = self.broadcaster.subscribe();
- tx.message_events().expire(&expire_at).await?;
-
let mut replays = BTreeMap::new();
let mut resume_live_at = resume_at.clone();
for channel in channels {
@@ -107,9 +126,6 @@ impl<'a> Events<'a> {
// * resume is redundant with the resume_at argument to
// `tx.broadcasts().replay(…)`.
let live_messages = live_messages
- // Sure, it's temporally improbable that we'll ever skip a message
- // that's 90 days old, but there's no reason not to be thorough.
- .filter(Self::skip_expired(&expire_at))
// Filtering on the broadcast resume point filters out messages
// before resume_at, and filters out messages duplicated from
// stored_messages.
@@ -134,12 +150,6 @@ impl<'a> Events<'a> {
) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
move |event| future::ready(resume_at.not_after(event))
}
- fn skip_expired(
- expire_at: &DateTime,
- ) -> impl for<'m> FnMut(&'m types::ChannelEvent) -> future::Ready<bool> {
- let expire_at = expire_at.to_owned();
- move |event| future::ready(expire_at < event.at)
- }
}
#[derive(Debug, thiserror::Error)]
diff --git a/src/events/expire.rs b/src/events/expire.rs
new file mode 100644
index 0000000..d92142d
--- /dev/null
+++ b/src/events/expire.rs
@@ -0,0 +1,18 @@
+use axum::{
+ extract::{Request, State},
+ middleware::Next,
+ response::Response,
+};
+
+use crate::{app::App, clock::RequestedAt, error::Internal};
+
+// Expires messages and channels before each request.
+pub async fn middleware(
+ State(app): State<App>,
+ RequestedAt(expired_at): RequestedAt,
+ req: Request,
+ next: Next,
+) -> Result<Response, Internal> {
+ app.events().expire(&expired_at).await?;
+ Ok(next.run(req).await)
+}
diff --git a/src/events/mod.rs b/src/events/mod.rs
index 711ae64..86bc5e9 100644
--- a/src/events/mod.rs
+++ b/src/events/mod.rs
@@ -1,5 +1,6 @@
pub mod app;
pub mod broadcaster;
+pub mod expire;
mod extract;
pub mod repo;
mod routes;
diff --git a/src/events/repo/message.rs b/src/events/repo/message.rs
index f6fce0e..32419d5 100644
--- a/src/events/repo/message.rs
+++ b/src/events/repo/message.rs
@@ -6,7 +6,7 @@ use crate::{
repo::{
channel::{self, Channel},
login::{self, Login},
- message,
+ message::{self, Message},
},
};
@@ -30,7 +30,7 @@ impl<'c> Events<'c> {
body: &str,
sent_at: &DateTime,
) -> Result<types::ChannelEvent, sqlx::Error> {
- let sequence = self.assign_sequence(&channel.id).await?;
+ let sequence = self.assign_sequence(channel).await?;
let id = message::Id::generate();
@@ -59,7 +59,7 @@ impl<'c> Events<'c> {
channel: channel.clone(),
data: types::MessageEvent {
sender: sender.clone(),
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
@@ -72,7 +72,7 @@ impl<'c> Events<'c> {
Ok(message)
}
- async fn assign_sequence(&mut self, channel: &channel::Id) -> Result<Sequence, sqlx::Error> {
+ async fn assign_sequence(&mut self, channel: &Channel) -> Result<Sequence, sqlx::Error> {
let next = sqlx::query_scalar!(
r#"
update channel
@@ -80,7 +80,7 @@ impl<'c> Events<'c> {
where id = $1
returning last_sequence as "next_sequence: Sequence"
"#,
- channel,
+ channel.id,
)
.fetch_one(&mut *self.0)
.await?;
@@ -88,18 +88,68 @@ impl<'c> Events<'c> {
Ok(next)
}
- pub async fn expire(&mut self, expire_at: &DateTime) -> Result<(), sqlx::Error> {
- sqlx::query!(
+ pub async fn delete_expired(
+ &mut self,
+ channel: &Channel,
+ message: &message::Id,
+ deleted_at: &DateTime,
+ ) -> Result<types::ChannelEvent, sqlx::Error> {
+ let sequence = self.assign_sequence(channel).await?;
+
+ sqlx::query_scalar!(
r#"
delete from message
+ where id = $1
+ returning 1 as "row: i64"
+ "#,
+ message,
+ )
+ .fetch_one(&mut *self.0)
+ .await?;
+
+ Ok(types::ChannelEvent {
+ sequence,
+ at: *deleted_at,
+ channel: channel.clone(),
+ data: types::MessageDeletedEvent {
+ message: message.clone(),
+ }
+ .into(),
+ })
+ }
+
+ pub async fn expired(
+ &mut self,
+ expire_at: &DateTime,
+ ) -> Result<Vec<(Channel, message::Id)>, sqlx::Error> {
+ let messages = sqlx::query!(
+ r#"
+ select
+ channel.id as "channel_id: channel::Id",
+ channel.name as "channel_name",
+ channel.created_at as "channel_created_at: DateTime",
+ message.id as "message: message::Id"
+ from message
+ join channel on message.channel = channel.id
+ join login as sender on message.sender = sender.id
where sent_at < $1
"#,
expire_at,
)
- .execute(&mut *self.0)
+ .map(|row| {
+ (
+ Channel {
+ id: row.channel_id,
+ name: row.channel_name,
+ created_at: row.channel_created_at,
+ },
+ row.message,
+ )
+ })
+ .fetch_all(&mut *self.0)
.await?;
- Ok(())
+ Ok(messages)
}
pub async fn replay(
@@ -134,7 +184,7 @@ impl<'c> Events<'c> {
id: row.sender_id,
name: row.sender_name,
},
- message: message::Message {
+ message: Message {
id: row.id,
body: row.body,
},
diff --git a/src/events/routes.rs b/src/events/routes.rs
index 3f70dcd..89c942c 100644
--- a/src/events/routes.rs
+++ b/src/events/routes.rs
@@ -13,7 +13,7 @@ use super::{
extract::LastEventId,
types::{self, ResumePoint},
};
-use crate::{app::App, clock::RequestedAt, error::Internal, repo::login::Login};
+use crate::{app::App, error::Internal, repo::login::Login};
#[cfg(test)]
mod test;
@@ -24,7 +24,6 @@ pub fn router() -> Router<App> {
async fn events(
State(app): State<App>,
- RequestedAt(subscribed_at): RequestedAt,
_: Login, // requires auth, but doesn't actually care who you are
last_event_id: Option<LastEventId<ResumePoint>>,
) -> Result<Events<impl Stream<Item = types::ResumableEvent> + std::fmt::Debug>, Internal> {
@@ -32,7 +31,7 @@ async fn events(
.map(LastEventId::into_inner)
.unwrap_or_default();
- let stream = app.events().subscribe(&subscribed_at, resume_at).await?;
+ let stream = app.events().subscribe(resume_at).await?;
Ok(Events(stream))
}
diff --git a/src/events/routes/test.rs b/src/events/routes/test.rs
index 55ada95..a6e2275 100644
--- a/src/events/routes/test.rs
+++ b/src/events/routes/test.rs
@@ -21,14 +21,14 @@ async fn includes_historical_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -47,11 +47,9 @@ async fn includes_live_message() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber, None)
+ .await
+ .expect("subscribe never fails");
// Verify the semantics
@@ -59,6 +57,7 @@ async fn includes_live_message() {
let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
let types::ResumableEvent(_, event) = events
+ .filter(fixtures::filter::messages())
.next()
.immediately()
.await
@@ -92,14 +91,14 @@ async fn includes_multiple_channels() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
// Verify the structure of the response.
let events = events
+ .filter(fixtures::filter::messages())
.take(messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -129,8 +128,7 @@ async fn sequential_messages() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
- let routes::Events(events) = routes::events(State(app), subscribed_at, subscriber, None)
+ let routes::Events(events) = routes::events(State(app), subscriber, None)
.await
.expect("subscribe never fails");
@@ -169,17 +167,19 @@ async fn resumes_from() {
// Call the endpoint
let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
let resume_at = {
// First subscription
- let routes::Events(mut events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
- let types::ResumableEvent(last_event_id, event) =
- events.next().immediately().await.expect("delivered events");
+ let types::ResumableEvent(last_event_id, event) = events
+ .filter(fixtures::filter::messages())
+ .next()
+ .immediately()
+ .await
+ .expect("delivered events");
assert_eq!(initial_message, event);
@@ -187,11 +187,9 @@ async fn resumes_from() {
};
// Resume after disconnect
- let reconnect_at = fixtures::now();
- let routes::Events(resumed) =
- routes::events(State(app), reconnect_at, subscriber, Some(resume_at.into()))
- .await
- .expect("subscribe never fails");
+ let routes::Events(resumed) = routes::events(State(app), subscriber, Some(resume_at.into()))
+ .await
+ .expect("subscribe never fails");
// Verify the structure of the response.
@@ -243,13 +241,12 @@ async fn serial_resume() {
];
// First subscription
- let subscribed_at = fixtures::now();
- let routes::Events(events) =
- routes::events(State(app.clone()), subscribed_at, subscriber.clone(), None)
- .await
- .expect("subscribe never fails");
+ let routes::Events(events) = routes::events(State(app.clone()), subscriber.clone(), None)
+ .await
+ .expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(initial_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -277,10 +274,8 @@ async fn serial_resume() {
];
// Second subscription
- let resubscribed_at = fixtures::now();
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -288,6 +283,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(resume_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -314,11 +310,9 @@ async fn serial_resume() {
fixtures::message::send(&app, &sender, &channel_b, &fixtures::now()).await,
];
- // Second subscription
- let resubscribed_at = fixtures::now();
+ // Third subscription
let routes::Events(events) = routes::events(
State(app.clone()),
- resubscribed_at,
subscriber.clone(),
Some(resume_at.into()),
)
@@ -326,6 +320,7 @@ async fn serial_resume() {
.expect("subscribe never fails");
let events = events
+ .filter(fixtures::filter::messages())
.take(final_messages.len())
.collect::<Vec<_>>()
.immediately()
@@ -340,33 +335,3 @@ async fn serial_resume() {
}
};
}
-
-#[tokio::test]
-async fn removes_expired_messages() {
- // Set up the environment
- let app = fixtures::scratch_app().await;
- let sender = fixtures::login::create(&app).await;
- let channel = fixtures::channel::create(&app, &fixtures::now()).await;
-
- fixtures::message::send(&app, &sender, &channel, &fixtures::ancient()).await;
- let message = fixtures::message::send(&app, &sender, &channel, &fixtures::now()).await;
-
- // Call the endpoint
-
- let subscriber = fixtures::login::create(&app).await;
- let subscribed_at = fixtures::now();
-
- let routes::Events(mut events) = routes::events(State(app), subscribed_at, subscriber, None)
- .await
- .expect("subscribe never fails");
-
- // Verify the semantics
-
- let types::ResumableEvent(_, event) = events
- .next()
- .immediately()
- .await
- .expect("delivered messages");
-
- assert_eq!(message, event);
-}
diff --git a/src/events/types.rs b/src/events/types.rs
index 944321a..9a65207 100644
--- a/src/events/types.rs
+++ b/src/events/types.rs
@@ -119,6 +119,7 @@ impl ResumeElement for ChannelEvent {
pub enum ChannelEventData {
Created,
Message(MessageEvent),
+ MessageDeleted(MessageDeletedEvent),
}
#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
@@ -132,3 +133,14 @@ impl From<MessageEvent> for ChannelEventData {
Self::Message(message)
}
}
+
+#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize)]
+pub struct MessageDeletedEvent {
+ pub message: message::Id,
+}
+
+impl From<MessageDeletedEvent> for ChannelEventData {
+ fn from(message: MessageDeletedEvent) -> Self {
+ Self::MessageDeleted(message)
+ }
+}
diff --git a/src/repo/channel.rs b/src/repo/channel.rs
index e85b898..6514426 100644
--- a/src/repo/channel.rs
+++ b/src/repo/channel.rs
@@ -2,7 +2,7 @@ use std::fmt;
use sqlx::{sqlite::Sqlite, SqliteConnection, Transaction};
-use crate::{clock::DateTime, id::Id as BaseId};
+use crate::{clock::DateTime, events::types::Sequence, id::Id as BaseId};
pub trait Provider {
fn channels(&mut self) -> Channels;
@@ -31,13 +31,14 @@ impl<'c> Channels<'c> {
created_at: &DateTime,
) -> Result<Channel, sqlx::Error> {
let id = Id::generate();
+ let sequence = Sequence::default();
let channel = sqlx::query_as!(
Channel,
r#"
insert
- into channel (id, name, created_at)
- values ($1, $2, $3)
+ into channel (id, name, created_at, last_sequence)
+ values ($1, $2, $3, $4)
returning
id as "id: Id",
name,
@@ -46,6 +47,7 @@ impl<'c> Channels<'c> {
id,
name,
created_at,
+ sequence,
)
.fetch_one(&mut *self.0)
.await?;
diff --git a/src/test/fixtures/filter.rs b/src/test/fixtures/filter.rs
new file mode 100644
index 0000000..8847e13
--- /dev/null
+++ b/src/test/fixtures/filter.rs
@@ -0,0 +1,9 @@
+use futures::future;
+
+use crate::events::types;
+
+pub fn messages() -> impl FnMut(&types::ResumableEvent) -> future::Ready<bool> {
+ |types::ResumableEvent(_, event)| {
+ future::ready(matches!(event.data, types::ChannelEventData::Message(_)))
+ }
+}
diff --git a/src/test/fixtures/mod.rs b/src/test/fixtures/mod.rs
index 450fbec..d1dd0c3 100644
--- a/src/test/fixtures/mod.rs
+++ b/src/test/fixtures/mod.rs
@@ -3,6 +3,7 @@ use chrono::{TimeDelta, Utc};
use crate::{app::App, clock::RequestedAt, repo::pool};
pub mod channel;
+pub mod filter;
pub mod future;
pub mod identity;
pub mod login;