Skip to content

Commit 05125fe

Browse files
committed
feat(postgres): Change the postgres adapter to be partition aware
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned. This was originally tested with PARTITION BY, but that requires manually keeping track of the partition tables which isn't a desired behaviour.
1 parent c1b9f8e commit 05125fe

7 files changed

Lines changed: 309 additions & 123 deletions

File tree

pg_migrations/0001_create_inflight_activations.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations (
1818
taskname TEXT NOT NULL,
1919
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
2020
);
21+
22+
CREATE INDEX idx_activation_partition ON inflight_taskactivations (partition);

src/kafka/consumer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::store::inflight_activation::InflightActivationStore;
12
use anyhow::{Error, anyhow};
23
use futures::{
34
Stream, StreamExt,
@@ -44,6 +45,7 @@ use tracing::{debug, error, info, instrument, warn};
4445
pub async fn start_consumer(
4546
topics: &[&str],
4647
kafka_client_config: &ClientConfig,
48+
activation_store: Arc<dyn InflightActivationStore>,
4749
spawn_actors: impl FnMut(
4850
Arc<StreamConsumer<KafkaContext>>,
4951
&BTreeSet<(String, i32)>,
@@ -68,6 +70,7 @@ pub async fn start_consumer(
6870
handle_events(
6971
consumer,
7072
event_receiver,
73+
activation_store,
7174
client_shutdown_sender,
7275
spawn_actors,
7376
)
@@ -340,6 +343,7 @@ enum ConsumerState {
340343
pub async fn handle_events(
341344
consumer: Arc<StreamConsumer<KafkaContext>>,
342345
events: UnboundedReceiver<(Event, SyncSender<()>)>,
346+
activation_store: Arc<dyn InflightActivationStore>,
343347
shutdown_client: oneshot::Sender<()>,
344348
mut spawn_actors: impl FnMut(
345349
Arc<StreamConsumer<KafkaContext>>,
@@ -372,6 +376,12 @@ pub async fn handle_events(
372376
state = match (state, event) {
373377
(ConsumerState::Ready, Event::Assign(tpl)) => {
374378
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
379+
// Note: This assumes we only process one topic per consumer.
380+
let mut partitions = Vec::<i32>::new();
381+
for (_, partition) in tpl.iter() {
382+
partitions.push(*partition);
383+
}
384+
activation_store.assign_partitions(partitions).unwrap();
375385
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
376386
}
377387
(ConsumerState::Ready, Event::Revoke(_)) => {

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ async fn main() -> Result<(), Error> {
162162
start_consumer(
163163
&[&consumer_config.kafka_topic],
164164
&consumer_config.kafka_consumer_config(),
165+
consumer_store.clone(),
165166
processing_strategy!({
166167
err:
167168
OsStreamWriter::new(

src/store/inflight_activation.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -337,21 +337,13 @@ impl InflightActivationStoreConfig {
337337

338338
#[async_trait]
339339
pub trait InflightActivationStore: Send + Sync {
340-
/// Trigger incremental vacuum to reclaim free pages in the database
341-
async fn vacuum_db(&self) -> Result<(), Error>;
342-
343-
/// Perform a full vacuum on the database
344-
async fn full_vacuum_db(&self) -> Result<(), Error>;
345-
346-
/// Get the size of the database in bytes
347-
async fn db_size(&self) -> Result<u64, Error>;
348-
349-
/// Get an activation by id
350-
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;
351-
340+
/// CONSUMER OPERATIONS
352341
/// Store a batch of activations
353342
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error>;
354343

344+
fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error>;
345+
346+
/// SERVER OPERATIONS
355347
/// Get a single pending activation, optionally filtered by namespace
356348
async fn get_pending_activation(
357349
&self,
@@ -385,6 +377,14 @@ pub trait InflightActivationStore: Send + Sync {
385377
limit: Option<i32>,
386378
) -> Result<Vec<InflightActivation>, Error>;
387379

380+
/// Update the status of a specific activation
381+
async fn set_status(
382+
&self,
383+
id: &str,
384+
status: InflightActivationStatus,
385+
) -> Result<Option<InflightActivation>, Error>;
386+
387+
/// COUNT OPERATIONS
388388
/// Get the age of the oldest pending activation in seconds
389389
async fn pending_activation_max_lag(&self, now: &DateTime<Utc>) -> f64;
390390

@@ -400,12 +400,9 @@ pub trait InflightActivationStore: Send + Sync {
400400
/// Count all activations
401401
async fn count(&self) -> Result<usize, Error>;
402402

403-
/// Update the status of a specific activation
404-
async fn set_status(
405-
&self,
406-
id: &str,
407-
status: InflightActivationStatus,
408-
) -> Result<Option<InflightActivation>, Error>;
403+
/// ACTIVATION OPERATIONS
404+
/// Get an activation by id
405+
async fn get_by_id(&self, id: &str) -> Result<Option<InflightActivation>, Error>;
409406

410407
/// Set the processing deadline for a specific activation
411408
async fn set_processing_deadline(
@@ -417,12 +414,20 @@ pub trait InflightActivationStore: Send + Sync {
417414
/// Delete an activation by id
418415
async fn delete_activation(&self, id: &str) -> Result<(), Error>;
419416

417+
/// DATABASE OPERATIONS
418+
/// Trigger incremental vacuum to reclaim free pages in the database
419+
async fn vacuum_db(&self) -> Result<(), Error>;
420+
421+
/// Perform a full vacuum on the database
422+
async fn full_vacuum_db(&self) -> Result<(), Error>;
423+
424+
/// Get the size of the database in bytes
425+
async fn db_size(&self) -> Result<u64, Error>;
426+
427+
/// UPKEEP OPERATIONS
420428
/// Get all activations with status Retry
421429
async fn get_retry_activations(&self) -> Result<Vec<InflightActivation>, Error>;
422430

423-
/// Clear all activations from the store
424-
async fn clear(&self) -> Result<(), Error>;
425-
426431
/// Update tasks that exceeded their processing deadline
427432
async fn handle_processing_deadline(&self) -> Result<u64, Error>;
428433

@@ -447,6 +452,10 @@ pub trait InflightActivationStore: Send + Sync {
447452
/// Remove killswitched tasks
448453
async fn remove_killswitched(&self, killswitched_tasks: Vec<String>) -> Result<u64, Error>;
449454

455+
/// TEST OPERATIONS
456+
/// Clear all activations from the store
457+
async fn clear(&self) -> Result<(), Error>;
458+
450459
/// Remove the database, used only in tests
451460
async fn remove_db(&self) -> Result<(), Error> {
452461
Ok(())
@@ -714,6 +723,10 @@ impl InflightActivationStore for SqliteActivationStore {
714723
Ok(Some(row.into()))
715724
}
716725

726+
fn assign_partitions(&self, partitions: Vec<i32>) -> Result<(), Error> {
727+
Ok(())
728+
}
729+
717730
#[instrument(skip_all)]
718731
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error> {
719732
if batch.is_empty() {

src/store/inflight_activation_tests.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,105 @@ async fn test_set_activation_status(#[case] adapter: &str) {
585585
store.remove_db().await.unwrap();
586586
}
587587

588+
#[tokio::test]
589+
#[rstest]
590+
#[case::postgres("postgres")]
591+
async fn test_set_activation_status_with_partitions(#[case] adapter: &str) {
592+
let store = create_test_store(adapter).await;
593+
594+
let mut batch = make_activations(2);
595+
batch[1].partition = 1;
596+
assert!(store.store(batch).await.is_ok());
597+
assert_counts(
598+
StatusCount {
599+
pending: 1,
600+
..StatusCount::default()
601+
},
602+
store.as_ref(),
603+
)
604+
.await;
605+
606+
assert!(
607+
store
608+
.set_status("id_0", InflightActivationStatus::Failure)
609+
.await
610+
.is_ok()
611+
);
612+
assert_counts(
613+
StatusCount {
614+
failure: 1,
615+
..StatusCount::default()
616+
},
617+
store.as_ref(),
618+
)
619+
.await;
620+
621+
assert!(
622+
store
623+
.set_status("id_0", InflightActivationStatus::Pending)
624+
.await
625+
.is_ok()
626+
);
627+
assert_counts(
628+
StatusCount {
629+
pending: 1,
630+
..StatusCount::default()
631+
},
632+
store.as_ref(),
633+
)
634+
.await;
635+
assert!(
636+
store
637+
.set_status("id_0", InflightActivationStatus::Failure)
638+
.await
639+
.is_ok()
640+
);
641+
assert!(
642+
store
643+
.set_status("id_1", InflightActivationStatus::Failure)
644+
.await
645+
.is_ok()
646+
);
647+
// The broker can update the status of an activation in a different partition, but
648+
// it still should not be counted in its upkeep.
649+
assert_counts(
650+
StatusCount {
651+
pending: 0,
652+
failure: 1,
653+
..StatusCount::default()
654+
},
655+
store.as_ref(),
656+
)
657+
.await;
658+
assert!(
659+
store
660+
.get_pending_activation(None, None)
661+
.await
662+
.unwrap()
663+
.is_none()
664+
);
665+
666+
let result = store
667+
.set_status("not_there", InflightActivationStatus::Complete)
668+
.await;
669+
assert!(result.is_ok(), "no query error");
670+
671+
let activation = result.unwrap();
672+
assert!(activation.is_none(), "no activation found");
673+
674+
let result = store
675+
.set_status("id_0", InflightActivationStatus::Complete)
676+
.await;
677+
assert!(result.is_ok(), "no query error");
678+
679+
let result_opt = result.unwrap();
680+
assert!(result_opt.is_some(), "activation should be returned");
681+
let inflight = result_opt.unwrap();
682+
assert_eq!(inflight.id, "id_0");
683+
assert_eq!(inflight.status, InflightActivationStatus::Complete);
684+
store.remove_db().await.unwrap();
685+
}
686+
588687
#[tokio::test]
589688
#[rstest]
590689
#[case::sqlite("sqlite")]

0 commit comments

Comments
 (0)