Change segment names to add stable config IDs for multi topic ingestion#18830
Open
rseetham wants to merge 10 commits into
Open
Change segment names to add stable config IDs for multi topic ingestion#18830rseetham wants to merge 10 commits into
rseetham wants to merge 10 commits into
Conversation
Add a stable, immutable config ID mechanism for multi-topic ingestion so that topic deletion no longer shifts partition group ID encoding. - Add STREAM_CONFIG_ID constant to StreamConfigProperties - Add nextStreamConfigId field to StreamIngestionConfig - Add instance methods on StreamIngestionConfig for config ID lookup: getConfigId(), getStreamConfigMapByConfigId(), getConfigIdToStreamConfigMap() - Keep encoding/decoding math as static methods on IngestionConfigUtils: getPinotPartitionIdFromConfigId(), getConfigIdFromPinotPartitionId() - Add unit tests for all new methods and serialization round-trip
Add ensureStreamConfigIds() to TableConfigUtils, called during validateStreamConfigMaps() on both add and update table config paths. Edge cases: - Legacy tables with no IDs → auto-assign from 0 - Partial IDs → assign only missing entries, skip used IDs - Gaps from deletion (e.g. IDs 0, 3) → new entry gets next available - Stale nextStreamConfigId → fails validation (user must fix) - Stale nextStreamConfigId with STREAM_CONFIG_ID skip → corrects with warning - Duplicate config IDs → rejected - Negative config IDs → rejected - Idempotent — running twice produces the same result Add STREAM_CONFIG_ID to ValidationType enum so users can skip the nextStreamConfigId validation via validationTypesToSkip query param.
New segment name format for multi-topic tables: tableName__configId__streamPartitionId__sequenceNumber__creationTime This stores config ID and stream partition ID as separate fields rather than encoding them as configId * 10000 + partitionId, eliminating overflow and collision risks. - MultiTopicLLCSegmentName: 5-part format with of()/isMultiTopicLLCSegment() - getPartitionGroupId() computes encoded integer for backward compat - toLLCSegmentName() converts to 4-part format for legacy callers - LLCSegmentName.isLLCSegment() updated to recognize both formats - SegmentUtils.getPartitionIdFromSegmentName() tries multi-topic format - Disambiguates from UploadedRealtimeSegmentName (also 5-part) by checking that parts[1], parts[2], parts[3] are all integers
Multi-topic tables now create segments with the 5-part format: tableName__configId__streamPartitionId__sequenceNumber__creationTime Changes: - LLCSegmentName string constructor parses both 4-part and 5-part formats, computing encoded partitionGroupId for 5-part - getNextLLCSegmentName() takes isMultiTopic flag to produce 5-part - setupNewPartitionGroup() creates 5-part for multi-topic tables - createNewSegmentMetadata() uses getNextLLCSegmentName() instead of direct LLCSegmentName construction All existing code that takes LLCSegmentName continues to work because getPartitionGroupId() returns the encoded value for both formats, and getSegmentName() returns the original string.
Tests covering: - stream.config.id assignment and uniqueness across stream configs - 5-part multi-topic segment name format with correct config ID / partition encoding - Config ID lookup methods (getConfigId, getStreamConfigMapByConfigId, getConfigIdToStreamConfigMap) - Backward compatibility: LLCSegmentName.of() parses 5-part format and produces same partition group ID
Stores the stream config ID directly on LLCSegmentName at parse time instead of requiring callers to derive it via IngestionConfigUtils each time. For 4-part single-topic segments this is always 0; for 5-part multi-topic segments it is the configId field parsed from the name.
All sites that previously derived stream config by integer-dividing partitionGroupId / 10000 (treating the quotient as a positional list index) now look up the stream config by its stable stream.config.id instead. This makes the encoding position-independent so topics can be deleted and re-added without corrupting existing segment-to-topic mappings. Changes: - IngestionConfigUtils: add getConfigIdFromStreamConfig(), update getStreamConfigFromPinotPartitionId() to do stable-ID lookup instead of list.get(index), update getStreamConfigMap() likewise - PinotLLCRealtimeSegmentManager: replace all StreamPartitionMsgOffset[] arrays keyed by index with Map<configId,...>; use LLCSegmentName. getStreamConfigId() where available; use getStreamConfigFromPinotPartitionId at all fetch/repair sites; encode new partitionGroupIds using configId not list index - RealtimeSegmentDataManager: look up stream config by configId from LLCSegmentName instead of list index - IngestionDelayTracker: key _streamConfigIndexToStreamMetadataProvider by stable configId; use getPinotPartitionIdFromConfigId for encoding - PinotTableRestletResource: group watermarks by configId not index - PartitionGroupMetadataFetcher: filter and encode using stable configId
…onfig scan Now that LLCSegmentName.getStreamConfigId() provides the stable config ID directly from the segment name, callers no longer need to linear-scan stream configs or call through the table config. Add IngestionConfigUtils.buildConfigIdToStreamConfigMap() to build a O(1) lookup map once per call site, then use it with the config ID from the segment name. Eliminates all remaining calls to getStreamConfigFromPinotPartitionId() in hot paths.
…rithmetic getHostedPartitionsGroupIds() now returns Map<partitionGroupId, configId> populated from LLCSegmentName.getStreamConfigId(), eliminating the partitionId / 10000 arithmetic in createMetrics() and removeMetrics(). _partitionsHostedByThisServer changed from Map<Integer, Boolean> to Map<Integer, Integer> to store the config ID alongside the partition ID.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18830 +/- ##
==============================================
+ Coverage 64.76% 100.00% +35.23%
+ Complexity 1319 6 -1313
==============================================
Files 3392 3 -3389
Lines 210971 6 -210965
Branches 33120 0 -33120
==============================================
- Hits 136630 6 -136624
+ Misses 63325 0 -63325
+ Partials 11016 0 -11016
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…eferences IngestionDelayTrackerTest: set distinct stream.config.id on each stream config so createOrUpdateStreamMetadataProvider creates separate providers. TableConfigUtilsTest: use independent map copies for multi-stream-config tests to prevent ensureStreamConfigIds from poisoning subsequent test cases via shared mutable map references.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Resolves #18739.
Multi-topic realtime ingestion encodes partition group IDs as topicIndex * 10000 + streamPartitionId, where topicIndex is the positional index in the config list. Deleting any topic except the last one shifts subsequent indices, breaking existing segment decoding.
This PR introduces stable, immutable config IDs (stream.config.id) that replace positional indices, and a 5-part segment name format that stores the config ID and stream partition ID as separate fields.
Key changes: