Skip to content

Change segment names to add stable config IDs for multi topic ingestion#18830

Open
rseetham wants to merge 10 commits into
apache:masterfrom
rseetham:stable-topic-ids-pr
Open

Change segment names to add stable config IDs for multi topic ingestion#18830
rseetham wants to merge 10 commits into
apache:masterfrom
rseetham:stable-topic-ids-pr

Conversation

@rseetham

Copy link
Copy Markdown
Contributor

Resolves #18739.

Multi-topic realtime ingestion encodes partition group IDs as topicIndex * 10000 + streamPartitionId, where topicIndex is the positional index in the config list. Deleting any topic except the last one shifts subsequent indices, breaking existing segment decoding.
This PR introduces stable, immutable config IDs (stream.config.id) that replace positional indices, and a 5-part segment name format that stores the config ID and stream partition ID as separate fields.

Key changes:

  • stream.config.id auto-assigned on table create/update, with validation (no duplicates, no negatives, stale-nextId detection)
  • MultiTopicLLCSegmentName: tableName__configId__streamPartitionId__sequenceNumber__creationTime
  • LLCSegmentName parses both 4-part and 5-part formats transparently
  • New segments created in 5-part format for multi-topic tables
  • All getStreamConfigIndexFromPinotPartitionId() call sites migrated to segment-name-based config ID lookup

rseetham added 9 commits June 22, 2026 08:08
Add a stable, immutable config ID mechanism for multi-topic ingestion
so that topic deletion no longer shifts partition group ID encoding.

- Add STREAM_CONFIG_ID constant to StreamConfigProperties
- Add nextStreamConfigId field to StreamIngestionConfig
- Add instance methods on StreamIngestionConfig for config ID lookup:
  getConfigId(), getStreamConfigMapByConfigId(), getConfigIdToStreamConfigMap()
- Keep encoding/decoding math as static methods on IngestionConfigUtils:
  getPinotPartitionIdFromConfigId(), getConfigIdFromPinotPartitionId()
- Add unit tests for all new methods and serialization round-trip
Add ensureStreamConfigIds() to TableConfigUtils, called during
validateStreamConfigMaps() on both add and update table config paths.

Edge cases:
- Legacy tables with no IDs → auto-assign from 0
- Partial IDs → assign only missing entries, skip used IDs
- Gaps from deletion (e.g. IDs 0, 3) → new entry gets next available
- Stale nextStreamConfigId → fails validation (user must fix)
- Stale nextStreamConfigId with STREAM_CONFIG_ID skip → corrects with warning
- Duplicate config IDs → rejected
- Negative config IDs → rejected
- Idempotent — running twice produces the same result

Add STREAM_CONFIG_ID to ValidationType enum so users can skip the
nextStreamConfigId validation via validationTypesToSkip query param.
New segment name format for multi-topic tables:
  tableName__configId__streamPartitionId__sequenceNumber__creationTime

This stores config ID and stream partition ID as separate fields
rather than encoding them as configId * 10000 + partitionId, eliminating
overflow and collision risks.

- MultiTopicLLCSegmentName: 5-part format with of()/isMultiTopicLLCSegment()
- getPartitionGroupId() computes encoded integer for backward compat
- toLLCSegmentName() converts to 4-part format for legacy callers
- LLCSegmentName.isLLCSegment() updated to recognize both formats
- SegmentUtils.getPartitionIdFromSegmentName() tries multi-topic format
- Disambiguates from UploadedRealtimeSegmentName (also 5-part) by
  checking that parts[1], parts[2], parts[3] are all integers
Multi-topic tables now create segments with the 5-part format:
  tableName__configId__streamPartitionId__sequenceNumber__creationTime

Changes:
- LLCSegmentName string constructor parses both 4-part and 5-part
  formats, computing encoded partitionGroupId for 5-part
- getNextLLCSegmentName() takes isMultiTopic flag to produce 5-part
- setupNewPartitionGroup() creates 5-part for multi-topic tables
- createNewSegmentMetadata() uses getNextLLCSegmentName() instead
  of direct LLCSegmentName construction

All existing code that takes LLCSegmentName continues to work
because getPartitionGroupId() returns the encoded value for both
formats, and getSegmentName() returns the original string.
Tests covering:
- stream.config.id assignment and uniqueness across stream configs
- 5-part multi-topic segment name format with correct config ID / partition encoding
- Config ID lookup methods (getConfigId, getStreamConfigMapByConfigId, getConfigIdToStreamConfigMap)
- Backward compatibility: LLCSegmentName.of() parses 5-part format and produces same partition group ID
Stores the stream config ID directly on LLCSegmentName at parse time
instead of requiring callers to derive it via IngestionConfigUtils each
time. For 4-part single-topic segments this is always 0; for 5-part
multi-topic segments it is the configId field parsed from the name.
All sites that previously derived stream config by integer-dividing
partitionGroupId / 10000 (treating the quotient as a positional list
index) now look up the stream config by its stable stream.config.id
instead. This makes the encoding position-independent so topics can be
deleted and re-added without corrupting existing segment-to-topic
mappings.

Changes:
- IngestionConfigUtils: add getConfigIdFromStreamConfig(), update
  getStreamConfigFromPinotPartitionId() to do stable-ID lookup instead
  of list.get(index), update getStreamConfigMap() likewise
- PinotLLCRealtimeSegmentManager: replace all StreamPartitionMsgOffset[]
  arrays keyed by index with Map<configId,...>; use LLCSegmentName.
  getStreamConfigId() where available; use getStreamConfigFromPinotPartitionId
  at all fetch/repair sites; encode new partitionGroupIds using configId
  not list index
- RealtimeSegmentDataManager: look up stream config by configId from
  LLCSegmentName instead of list index
- IngestionDelayTracker: key _streamConfigIndexToStreamMetadataProvider
  by stable configId; use getPinotPartitionIdFromConfigId for encoding
- PinotTableRestletResource: group watermarks by configId not index
- PartitionGroupMetadataFetcher: filter and encode using stable configId
…onfig scan

Now that LLCSegmentName.getStreamConfigId() provides the stable config ID
directly from the segment name, callers no longer need to linear-scan
stream configs or call through the table config. Add
IngestionConfigUtils.buildConfigIdToStreamConfigMap() to build a O(1)
lookup map once per call site, then use it with the config ID from the
segment name. Eliminates all remaining calls to
getStreamConfigFromPinotPartitionId() in hot paths.
…rithmetic

getHostedPartitionsGroupIds() now returns Map<partitionGroupId, configId>
populated from LLCSegmentName.getStreamConfigId(), eliminating the
partitionId / 10000 arithmetic in createMetrics() and removeMetrics().
_partitionsHostedByThisServer changed from Map<Integer, Boolean> to
Map<Integer, Integer> to store the config ID alongside the partition ID.
@rseetham rseetham changed the title Stable topic ids pr Change segment names to add stable config IDs for multi topic ingestion Jun 22, 2026
@codecov-commenter

codecov-commenter commented Jun 22, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (54f29a9) to head (d602e26).
⚠️ Report is 4 commits behind head on master.

Additional details and impacted files
@@              Coverage Diff               @@
##             master    #18830       +/-   ##
==============================================
+ Coverage     64.76%   100.00%   +35.23%     
+ Complexity     1319         6     -1313     
==============================================
  Files          3392         3     -3389     
  Lines        210971         6   -210965     
  Branches      33120         0    -33120     
==============================================
- Hits         136630         6   -136624     
+ Misses        63325         0    -63325     
+ Partials      11016         0    -11016     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 100.00% <ø> (+35.23%) ⬆️
temurin 100.00% <ø> (+35.23%) ⬆️
unittests ?
unittests1 ?
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

…eferences

IngestionDelayTrackerTest: set distinct stream.config.id on each stream
config so createOrUpdateStreamMetadataProvider creates separate providers.

TableConfigUtilsTest: use independent map copies for multi-stream-config
tests to prevent ensureStreamConfigIds from poisoning subsequent test
cases via shared mutable map references.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PEP: Redesign multi topic realtime ingestion support

2 participants