Support AVG aggregation in MergeRollupTask and RealtimeToOfflineSegmentsTask#18822
Conversation
…ntsTask AVG is computed from a serialized AvgPair (sum + count) stored in a BYTES column produced by an AVG ingestion aggregation or star-tree. Merging adds the sums and counts so the average stays correct across multiple rollup levels; averaging already-averaged scalars would be wrong for groups with unequal counts. The query-time AvgAggregationFunction already reads the same serialized AvgPair format, so no query-path change is needed. - Add AvgValueAggregator to the segment-processing rollup reducer - Register AVG in the pinot-core processing ValueAggregatorFactory - Allow AVG in MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS - Add unit + executor tests for both MergeRollup and RealtimeToOffline, covering unequal counts and two-level rollup
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18822 +/- ##
============================================
+ Coverage 64.76% 64.77% +0.01%
- Complexity 1319 1322 +3
============================================
Files 3392 3393 +1
Lines 210949 211015 +66
Branches 33119 33133 +14
============================================
+ Hits 136611 136676 +65
+ Misses 63323 63316 -7
- Partials 11015 11023 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue; see inline comment.
There was a problem hiding this comment.
Pull request overview
This PR enables AVG as a supported rollup aggregation for minion merge tasks (MergeRollupTask and RealtimeToOfflineSegmentsTask in rollup mode) by introducing a segment-processing ValueAggregator that merges serialized AvgPair values (sum + count) stored in a BYTES metric column.
Changes:
- Add
AvgValueAggregator(pinot-core) and wireAggregationFunctionType.AVGintoValueAggregatorFactory. - Allow
AVGthrough merge-task config validation viaMinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS. - Add unit + end-to-end tests covering correct rollup semantics (including unequal counts and multi-level rollups).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java |
Adds an end-to-end rollup test for AVG on a serialized AvgPair BYTES column. |
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java |
Updates the “parseable-but-unsupported” negative validation case now that avg becomes supported. |
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java |
Adds end-to-end MergeRollup executor coverage for AVG rollup using serialized AvgPair. |
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java |
Adds unit tests for the new AvgValueAggregator and factory wiring. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java |
Registers AVG in the segment-processing value aggregator factory. |
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java |
Introduces the BYTES-backed aggregator that merges serialized AvgPair values. |
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java |
Adds AVG to the allow-list used by merge-task config validation. |
The merge-rollup and realtime-to-offline task generators previously validated the configured aggregationType only against AVAILABLE_CORE_VALUE_AGGREGATORS, not the target column's dataType. A bytes-backed aggregation (AVG, the sketch types, TDigest, KLL) configured on a non-BYTES column passed config validation but failed with a ClassCastException at task runtime. Add ValueAggregatorFactory.isBytesBacked() (co-located with the aggregator switch as a single source of truth) and MergeTaskUtils.validateAggregationColumnType(), called from both generators, to reject such configs at config time with a clear message, uniformly for all bytes-backed types.
…values When both inputs to the merge-rollup AvgValueAggregator are empty (the default null value for a BYTES column), it previously returned an empty byte[], which would be written into the merged segment and then fail AvgPair deserialization (expects 16 bytes) in the query-time AVG function. Return a serialized empty AvgPair (sum=0, count=0) instead, which the query path handles as a no-data row.
Summary
Adds
AVGto the set of aggregation functions supported byMergeRollupTaskandRealtimeToOfflineSegmentsTask(rollup merge type), and hardens config validation for all bytes-backed rollup aggregations.Previously, configuring
<column>.aggregationType=avgfor a merge/rollup task failed config validation withValueAggregator not enabled for type: AVG.How AVG works
AVGis rolled up the same way as the other bytes-backed types (DISTINCTCOUNTHLL,PERCENTILETDIGEST, …): the metric column stores a serializedAvgPair(sum + count) in aBYTEScolumn, produced by anAVGingestion aggregation or star-tree. The rollup reducer merges twoAvgPairs by adding their sums and counts, so the average stays correct across multiple rollup levels.This is necessary because plain
AVGis not re-aggregatable from a scalar — averaging already-averaged values is wrong whenever the merged groups have unequal counts. Carrying(sum, count)and dividing only at the end avoids the average-of-averages trap.The query-time
AvgAggregationFunctionalready reads this exact serializedAvgPairformat fromBYTEScolumns, so no query-path change is required — this PR only wires up the merge/rollup side.AVG changes
AvgValueAggregatorin the pinot-core segment-processing aggregator package (merges serializedAvgPairs; emptybyte[]treated as missing, mirroring the sibling sketch aggregators).AVGinpinot-coreValueAggregatorFactory.AVGtoMinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS(shared by both merge tasks).Config-time column-type validation (second commit)
AVGand the other bytes-backed rollup aggregations (HLL/Theta/Tuple/CPC/ULL sketches, KLL, TDigest) operate on a column that already stores a serialized object, so their merge aggregators read the column value asbyte[]. Previously the task generators validated only that the aggregation type was allow-listed — not that the column was declaredBYTES. A bytes-backed aggregation on a raw numeric column passed config validation but failed with aClassCastExceptionat task runtime.This PR adds
ValueAggregatorFactory.isBytesBacked(AggregationFunctionType)(a predicate co-located with the aggregator switch as the single source of truth, mirroring the existingrequiresTimeOrdering) andMergeTaskUtils.validateAggregationColumnType(...), called from both theMergeRollupTaskandRealtimeToOfflineSegmentsTaskgenerators. A bytes-backed aggregation configured on a non-BYTEScolumn now fails at config time with a clear message, uniformly for all such types.This rejects configs that previously passed
validateTaskConfigsbut always failed at runtime withClassCastException; no previously-working config is affected (a working setup already usesBYTEScolumns).validateTaskConfigsruns on table create/update, not on a periodic cadence against live tables.Backward compatibility / rolling upgrade
AvgPairserialization is unchanged and already used at query time).AVGmerge task while a minion is still on an old version, the old minion's factory throwsIllegalStateException("Unsupported aggregation type: AVG")at task runtime — it fails loudly with no data corruption. Upgrade minions before enablingAVGrollup.Usage requirement
AVGmerge/rollup operates on a column that already stores a serializedAvgPair, i.e. aBYTESmetric column produced by anAVGingestion aggregation (or star-tree). The new validation enforces this at config time.Usage example
The
AVGcolumn stores a serializedAvgPair(sum + count) in aBYTEScolumn, so it must beAvgPairend to end (a hybrid table shares one schema, so the column type cannot differ between realtime and offline).BYTESmetric:AvgPairfrom a raw source field, and mark it no-dictionary:AvgPairs up over time with the per-column aggregation type:AVG(); it detects theBYTEScolumn, deserializes eachAvgPair, merges them, and returnsΣsum / Σcount:AVG()is the only function that decodes the storedAvgPair— selecting the column directly (SELECT latencyAvg ...) returns the raw serialized bytes, not a number. There is no built-in function to read the underlying sum/count separately; store them as dedicatedSUM/COUNTcolumns if you need them.Testing
AvgValueAggregatorTest(pinot-core): merge adds sum/count, two-level rollup preserves totals, empty/malformed bytes handling, factory wiring.MergeRollupAvgTaskExecutorTest(new): end-to-end rollup of aBYTESAvgPaircolumn across segments, including unequal counts and a true two-level rollup that would fail under average-of-averages.RealtimeToOfflineSegmentsTaskExecutorTest: addedtestRollupWithAvgAggregationcovering the second consumer end-to-end.MergeRollupTaskGeneratorTest/RealtimeToOfflineSegmentsTaskGeneratorTest: addedtestBytesBackedAggregationColumnTypeValidationcovering the reject path (bytes-backed type on a non-BYTES column) and the accept path (bytes-backed type on a BYTES column), plus non-bytes-backed types (sum/max) remaining valid on numeric columns.Release note
MergeRollupTaskandRealtimeToOfflineSegmentsTasknow supportAVGaggregation on aBYTEScolumn storing a serializedAvgPair(sum + count). Bytes-backed rollup aggregations (AVG, sketches, TDigest) are now validated at config time to require aBYTEScolumn.