Skip to content

feat: Prunable shard specs for streaming published segments#19571

Open
abhishekrb19 wants to merge 4 commits into
apache:masterfrom
abhishekrb19:streaming_partitioned_dims
Open

feat: Prunable shard specs for streaming published segments#19571
abhishekrb19 wants to merge 4 commits into
apache:masterfrom
abhishekrb19:streaming_partitioned_dims

Conversation

@abhishekrb19

@abhishekrb19 abhishekrb19 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Relates to #12929

Streaming-published segments currently have numbered shard specs, which aren't prunable by design. Compaction must reindex the data with range or hashed partition strategy once the data is handed off — even if the topic is partitioned, which is easy to do with multiple
supervisors. For multi-tenant datasources this means every tenant-filtered query hits every recent segment regardless of the partitioning strategy for numbered shard specs.

This PR lets streaming tasks record, per published segment, the distinct values observed for a configured set of dimensions, and declare them on a new shard spec so the broker can prune near-realtime data without waiting for compaction to reindex handed off segments. Concurrent compaction cannot always keep up with the incoming data and additionally the compaction process itself takes time to reindex; so the benefits of range or hashed shard specs may not be fully realized for however long it takes to reindex (30-45 minutes in our case), and doesn't help with high concurrent query workloads that are only querying more recent data.

So this PR adds a way to publish prunable shards right off the bat when they’re handed off by streaming tasks, if configured. This functionality is opt-in, Kafka-only, and disabled by default.

Design

  • StreamRangeShardSpec (type: "stream_range") — extends NumberedShardSpec (behaves as a normal append segment) plus a partitionFilters map (dimension → observed values). possibleInDomain() prunes a segment when the query constrains a declared
    dimension and none of its values intersect the domain; a dimension not in partitionFilters is never pruned on. Set-based (not min/max), so it prunes precisely for sparse values and tolerates overlapping value sets across tasks/restarts.
  • Because partition values are observed at ingestion rather than hardcoded, incorrect or abruptly-changing partitioning never breaks correctness - at worst it yields non-prunable shard specs (similar to the default numbered shard) or bloated shard specs (this caveat can be addressed with a guard rail noted below).
  • Ingestion (SeekableStreamIndexTaskRunner) — when partitionFilterDimensions is set, the task accumulates observed values per segment and stamps each at publish.

Configuration

New optional IO config field partitionFilterDimensions on the Kafka supervisor/task (default null). When unset, behavior is unchanged. Documented in docs/ingestion/kafka-ingestion.md.

Compatibility

Backward-compatible and opt-in. But stream_range is a new core ShardSpec type with no defaultImpl fallback, so it is not forward-compatible: upgrade all services before enabling partitionFilterDimensions, and note that once stream_range
segments are published, downgrade isn't supported until they're compacted away.

Results

Tested in a cluster, where I saw up to ~40% reduction in segment scans on the historicals for a few low to medium cardinal partition dimensions. In a follow-up, I want to extend this to also prune tasks, for reduced peon buffers and better query performance at the task layer.

Caveats

There's currently no limit on the number of observed values stamped into a segment's partitionFilters. It may make sense, in a follow-up, to add a configurable guardrail that falls back to NumberedShardSpec when the count exceeds a threshold, so shard specs don't get bloated.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

…gestion

Adds a new core ShardSpec (stream_range) that lets Kafka streaming tasks
declare, per published segment, the distinct values observed for configured
partitionFilterDimensions. The broker uses these to prune segments whose
declared values cannot match a query filter — enabling near-realtime pruning
without waiting for compaction.

Highlights:
- StreamRangeShardSpec extends NumberedShardSpec; possibleInDomain prunes by
  per-value range intersection. Null is declared as a first-class value
  (encoded as Range.lessThan("")) so IS NULL queries are never wrongly pruned,
  and is kept distinct from the empty string.
- Opt-in via partitionFilterDimensions on the Kafka supervisor/IOConfig
  (null by default; segments otherwise get a plain NumberedShardSpec). Kafka
  only for now; backward-compatible config (old specs/constructors unchanged).
- Per-segment value accumulation at ingest time; each segment is stamped with
  only its own observed values at publish.
- Correctness guards: restart-spanning segments fall back to NumberedShardSpec
  (pre-restart rows are not re-read, so their values can't be fully observed);
  dimensions that observed a null/missing value declare null so IS NULL is not
  pruned.
- BaseAppenderatorDriver reconciles the returned SegmentsAndCommitMetadata to
  the published shard specs so handoff/publish logs report the real spec.

Tests:
- StreamRangeShardSpecTest: possibleInDomain matrix incl. null vs "" and serde.
- SeekableStreamIndexTaskRunnerTest: annotator unit tests (restart fallback,
  null handling).
- EmbeddedStreamRangeShardSpecTest: end-to-end pruning verified via the
  query/segment/time scan metric across a predicate matrix (=, !=, IN, NOT IN,
  IS NULL, IS NOT NULL, multi-value, untracked dimension, non-existent value),
  plus a no-partitioning control twin and in-memory/graceful-widening cases.
- StreamAppenderatorDriverTest: returned metadata carries the published spec.

// annotateSegmentWithPartitionFilters is a no-op (returns the segment unchanged) when partition filters are not
// configured, so it is always safe to apply here.
final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> shardSpecAnnotator =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can add Function to imports


for (DataSegment segment : publishedSegmentsAndCommitMetadata.getSegments()) {
observedDimensionValuesBySegment.remove(
SegmentIdWithShardSpec.fromDataSegment(segment).toString()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also clean up restartSpannedSegments.remove here?

return s;
}
final Map<String, List<String>> snapshotFilters = new HashMap<>();
for (String dim : filterDims) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (String dim : filterDims) {
    segObserved.computeIfPresent(dim, (k, vals) -> {
      synchronized (vals) {
        if (!vals.isEmpty()) {
          snapshotFilters.put(dim, new ArrayList<>(vals));
        }
      }
      return vals;  // Return unchanged - we're just reading
    });
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude recommendation for race condition

Comment thread docs/ingestion/kafka-ingestion.md Outdated
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100|
|`useEarliestOffset`|Boolean|If a supervisor is managing a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether the supervisor retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.|No|`false`|
|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle configuration](#idle-configuration) for more details.|No|null|
|`partitionFilterDimensions`|List of String|Dimensions to track for query-time segment pruning. See [Partition filter dimensions](#partition-filter-dimensions) for details.|No|null|

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this partitionDimensions to align with the compaction config? That may make it more clear that those values should be in sync

Comment thread docs/ingestion/kafka-ingestion.md Outdated
- Use only low-to-medium cardinality dimensions (for example, `tenant_id`, `region`, `environment`). High-cardinality dimensions bloat segment metadata with no pruning benefit.
- Most effective when Kafka partitions are keyed by the tracked dimension (for example, using tenant ID as the message key). Each task naturally sees a subset of values, and segments get tight filter annotations.
- Also works with multiple supervisors reading from separate topics into one datasource.
- After compaction, the `StreamRangeShardSpec` annotations are replaced by the compaction output's shard spec (hash or range partitioning), which provides its own pruning.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth mentioning that when using partitionFilterDimensions, dynamic compaction strategy should not be used

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 1
Total 2

Reviewed 16 of 16 changed files.

I found two issues: restart-spanning segments can mix shard spec classes within one publish interval and fail publishing, and Kafka backfill specs drop partitionFilterDimensions.


This is an automated review by Codex GPT-5.5

return s;
}
final String lookupKey = SegmentIdWithShardSpec.fromDataSegment(s).toString();
if (restartSpannedSegments.contains(lookupKey)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Mixed shard specs can fail publish after restart

Restart-spanned segments return unchanged as NumberedShardSpec, while new same-interval segments in the same publish batch can be annotated as StreamRangeShardSpec. TransactionalSegmentPublisher then runs SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec classes per interval, so a restarted task can fail publish/handoff. Make the fallback interval-wide, or stamp restored segments with a non-pruning stream_range spec.

emitTimeLagMetrics,
serverPriorityToReplicas,
boundedStreamConfig,
null

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Backfill specs drop partitionFilterDimensions

This compatibility constructor always forwards null for partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses this overload when deriving bounded backfill specs, so a supervisor configured with partitionFilterDimensions silently creates backfill tasks without the pruning annotations. Pass the existing dimension list through for backfill specs.

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 0
P3 1
Total 1

Reviewed 19 of 19 changed files.


This is an automated review by Codex GPT-5.5

if (vals.isEmpty()) {
continue;
}
snapshot = new ArrayList<>(vals);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Sort observed partition values before publishing

The list stored in partitionDimensionValues is created directly from a HashSet, so its order is unspecified. The embedded test already asserts a concrete order for these values, and equivalent published segment metadata can vary by JVM or run even when the value set is identical. Sort the snapshot deterministically, with explicit null handling, before putting it into the shard spec.

Reviewed 19 of 19 changed files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants