Skip to content

Conversation

@evanh
Copy link
Member

@evanh evanh commented Jan 16, 2026

Have the postgres adapter only fetch and do upkeep on activations that are part of the partition that the consumer is assigned. The broker can still update tasks outside its partitions, in case a worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions to the store whenever partitions are assigned.

@evanh evanh requested a review from a team as a code owner January 16, 2026 20:43
@evanh evanh requested review from a team and removed request for a team January 16, 2026 20:43
Have the postgres adapter only fetch and do upkeep on activations that are part of the partition
that the consumer is assigned. The broker can still update tasks outside its partitions, in case a
worker is connected to a broker that is then rebalanced. Change the consumer to pass the partitions
to the store whenever partitions are assigned.

This was originally tested with PARTITION BY, but that requires manually keeping track of the
partition tables which isn't a desired behaviour.
@evanh evanh force-pushed the evanh/fix/use-partitions-in-postgres branch from 8689312 to 05125fe Compare January 16, 2026 21:56
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(tpl)) => {
metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64);
// Note: This assumes we only process one topic per consumer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far that has always been true.

read_pool: PgPool,
write_pool: PgPool,
config: PostgresActivationStoreConfig,
partitions: RwLock<Vec<i32>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you use a tokio RwLock to avoid locking up one of the tokio threads?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants