Skip to content

Support AVG aggregation in MergeRollupTask and RealtimeToOfflineSegmentsTask#18822

Merged
xiangfu0 merged 3 commits into
apache:masterfrom
xiangfu0:claude/agitated-feynman-b8a368
Jun 23, 2026
Merged

Support AVG aggregation in MergeRollupTask and RealtimeToOfflineSegmentsTask#18822
xiangfu0 merged 3 commits into
apache:masterfrom
xiangfu0:claude/agitated-feynman-b8a368

Conversation

@xiangfu0

@xiangfu0 xiangfu0 commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Summary

Adds AVG to the set of aggregation functions supported by MergeRollupTask and RealtimeToOfflineSegmentsTask (rollup merge type), and hardens config validation for all bytes-backed rollup aggregations.

Previously, configuring <column>.aggregationType=avg for a merge/rollup task failed config validation with ValueAggregator not enabled for type: AVG.

How AVG works

AVG is rolled up the same way as the other bytes-backed types (DISTINCTCOUNTHLL, PERCENTILETDIGEST, …): the metric column stores a serialized AvgPair (sum + count) in a BYTES column, produced by an AVG ingestion aggregation or star-tree. The rollup reducer merges two AvgPairs by adding their sums and counts, so the average stays correct across multiple rollup levels.

This is necessary because plain AVG is not re-aggregatable from a scalar — averaging already-averaged values is wrong whenever the merged groups have unequal counts. Carrying (sum, count) and dividing only at the end avoids the average-of-averages trap.

The query-time AvgAggregationFunction already reads this exact serialized AvgPair format from BYTES columns, so no query-path change is required — this PR only wires up the merge/rollup side.

AVG changes

  • New AvgValueAggregator in the pinot-core segment-processing aggregator package (merges serialized AvgPairs; empty byte[] treated as missing, mirroring the sibling sketch aggregators).
  • Register AVG in pinot-core ValueAggregatorFactory.
  • Add AVG to MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS (shared by both merge tasks).

Config-time column-type validation (second commit)

AVG and the other bytes-backed rollup aggregations (HLL/Theta/Tuple/CPC/ULL sketches, KLL, TDigest) operate on a column that already stores a serialized object, so their merge aggregators read the column value as byte[]. Previously the task generators validated only that the aggregation type was allow-listed — not that the column was declared BYTES. A bytes-backed aggregation on a raw numeric column passed config validation but failed with a ClassCastException at task runtime.

This PR adds ValueAggregatorFactory.isBytesBacked(AggregationFunctionType) (a predicate co-located with the aggregator switch as the single source of truth, mirroring the existing requiresTimeOrdering) and MergeTaskUtils.validateAggregationColumnType(...), called from both the MergeRollupTask and RealtimeToOfflineSegmentsTask generators. A bytes-backed aggregation configured on a non-BYTES column now fails at config time with a clear message, uniformly for all such types.

This rejects configs that previously passed validateTaskConfigs but always failed at runtime with ClassCastException; no previously-working config is affected (a working setup already uses BYTES columns). validateTaskConfigs runs on table create/update, not on a periodic cadence against live tables.

Backward compatibility / rolling upgrade

  • Purely additive to an allow-list and a factory switch; existing working configs are unaffected.
  • No new enum constant, SPI signature change, or wire-format change (AvgPair serialization is unchanged and already used at query time).
  • Rolling-upgrade note: if a new controller schedules an AVG merge task while a minion is still on an old version, the old minion's factory throws IllegalStateException("Unsupported aggregation type: AVG") at task runtime — it fails loudly with no data corruption. Upgrade minions before enabling AVG rollup.

Usage requirement

AVG merge/rollup operates on a column that already stores a serialized AvgPair, i.e. a BYTES metric column produced by an AVG ingestion aggregation (or star-tree). The new validation enforces this at config time.

Usage example

The AVG column stores a serialized AvgPair (sum + count) in a BYTES column, so it must be AvgPair end to end (a hybrid table shares one schema, so the column type cannot differ between realtime and offline).

  1. Schema — declare the column as a BYTES metric:
"metricFieldSpecs": [ { "name": "latencyAvg", "dataType": "BYTES" } ]
  1. Ingestion aggregation (table config) — populate it as an AvgPair from a raw source field, and mark it no-dictionary:
"tableIndexConfig": { "noDictionaryColumns": ["latencyAvg"] },
"ingestionConfig": {
  "aggregationConfigs": [
    { "columnName": "latencyAvg", "aggregationFunction": "AVG(latency)" }
  ]
}
  1. Merge rollup (task config) — roll the AvgPairs up over time with the per-column aggregation type:
"MergeRollupTask": {
  "1day.mergeType": "rollup",
  "1day.bucketTimePeriod": "1d",
  "latencyAvg.aggregationType": "avg"
}
  1. Query — use AVG(); it detects the BYTES column, deserializes each AvgPair, merges them, and returns Σsum / Σcount:
SELECT AVG(latencyAvg) FROM myTable;
SELECT country, AVG(latencyAvg) FROM myTable GROUP BY country;

AVG() is the only function that decodes the stored AvgPair — selecting the column directly (SELECT latencyAvg ...) returns the raw serialized bytes, not a number. There is no built-in function to read the underlying sum/count separately; store them as dedicated SUM/COUNT columns if you need them.

Testing

  • AvgValueAggregatorTest (pinot-core): merge adds sum/count, two-level rollup preserves totals, empty/malformed bytes handling, factory wiring.
  • MergeRollupAvgTaskExecutorTest (new): end-to-end rollup of a BYTES AvgPair column across segments, including unequal counts and a true two-level rollup that would fail under average-of-averages.
  • RealtimeToOfflineSegmentsTaskExecutorTest: added testRollupWithAvgAggregation covering the second consumer end-to-end.
  • MergeRollupTaskGeneratorTest / RealtimeToOfflineSegmentsTaskGeneratorTest: added testBytesBackedAggregationColumnTypeValidation covering the reject path (bytes-backed type on a non-BYTES column) and the accept path (bytes-backed type on a BYTES column), plus non-bytes-backed types (sum/max) remaining valid on numeric columns.

Release note

MergeRollupTask and RealtimeToOfflineSegmentsTask now support AVG aggregation on a BYTES column storing a serialized AvgPair (sum + count). Bytes-backed rollup aggregations (AVG, sketches, TDigest) are now validated at config time to require a BYTES column.

…ntsTask

AVG is computed from a serialized AvgPair (sum + count) stored in a BYTES
column produced by an AVG ingestion aggregation or star-tree. Merging adds
the sums and counts so the average stays correct across multiple rollup
levels; averaging already-averaged scalars would be wrong for groups with
unequal counts. The query-time AvgAggregationFunction already reads the same
serialized AvgPair format, so no query-path change is needed.

- Add AvgValueAggregator to the segment-processing rollup reducer
- Register AVG in the pinot-core processing ValueAggregatorFactory
- Allow AVG in MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS
- Add unit + executor tests for both MergeRollup and RealtimeToOffline,
  covering unequal counts and two-level rollup
@codecov-commenter

codecov-commenter commented Jun 22, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 82.14286% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.77%. Comparing base (14bc147) to head (7fcfb8e).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
.../processing/aggregator/ValueAggregatorFactory.java 25.00% 3 Missing ⚠️
.../org/apache/pinot/core/common/MinionConstants.java 0.00% 1 Missing ⚠️
...ache/pinot/plugin/minion/tasks/MergeTaskUtils.java 87.50% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18822      +/-   ##
============================================
+ Coverage     64.76%   64.77%   +0.01%     
- Complexity     1319     1322       +3     
============================================
  Files          3392     3393       +1     
  Lines        210949   211015      +66     
  Branches      33119    33133      +14     
============================================
+ Hits         136611   136676      +65     
+ Misses        63323    63316       -7     
- Partials      11015    11023       +8     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.77% <82.14%> (+0.01%) ⬆️
temurin 64.77% <82.14%> (+0.01%) ⬆️
unittests 64.76% <82.14%> (+0.01%) ⬆️
unittests1 56.96% <77.77%> (+<0.01%) ⬆️
unittests2 37.19% <32.14%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

@xiangfu0 xiangfu0 requested review from Jackie-Jiang and Copilot June 23, 2026 03:55
@xiangfu0 xiangfu0 added merge-rollup Related to merge-rollup task processing minion Related to Pinot Minion task framework aggregation Related to aggregation functions and operations labels Jun 23, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enables AVG as a supported rollup aggregation for minion merge tasks (MergeRollupTask and RealtimeToOfflineSegmentsTask in rollup mode) by introducing a segment-processing ValueAggregator that merges serialized AvgPair values (sum + count) stored in a BYTES metric column.

Changes:

  • Add AvgValueAggregator (pinot-core) and wire AggregationFunctionType.AVG into ValueAggregatorFactory.
  • Allow AVG through merge-task config validation via MinionConstants.MergeRollupTask.AVAILABLE_CORE_VALUE_AGGREGATORS.
  • Add unit + end-to-end tests covering correct rollup semantics (including unequal counts and multi-level rollups).

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java Adds an end-to-end rollup test for AVG on a serialized AvgPair BYTES column.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java Updates the “parseable-but-unsupported” negative validation case now that avg becomes supported.
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupAvgTaskExecutorTest.java Adds end-to-end MergeRollup executor coverage for AVG rollup using serialized AvgPair.
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregatorTest.java Adds unit tests for the new AvgValueAggregator and factory wiring.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java Registers AVG in the segment-processing value aggregator factory.
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/AvgValueAggregator.java Introduces the BYTES-backed aggregator that merges serialized AvgPair values.
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java Adds AVG to the allow-list used by merge-task config validation.

xiangfu0 added 2 commits June 22, 2026 21:20
The merge-rollup and realtime-to-offline task generators previously validated
the configured aggregationType only against AVAILABLE_CORE_VALUE_AGGREGATORS,
not the target column's dataType. A bytes-backed aggregation (AVG, the sketch
types, TDigest, KLL) configured on a non-BYTES column passed config validation
but failed with a ClassCastException at task runtime.

Add ValueAggregatorFactory.isBytesBacked() (co-located with the aggregator
switch as a single source of truth) and MergeTaskUtils.validateAggregationColumnType(),
called from both generators, to reject such configs at config time with a clear
message, uniformly for all bytes-backed types.
…values

When both inputs to the merge-rollup AvgValueAggregator are empty (the default
null value for a BYTES column), it previously returned an empty byte[], which
would be written into the merged segment and then fail AvgPair deserialization
(expects 16 bytes) in the query-time AVG function. Return a serialized empty
AvgPair (sum=0, count=0) instead, which the query path handles as a no-data row.
@xiangfu0 xiangfu0 merged commit e44e35d into apache:master Jun 23, 2026
11 checks passed
@xiangfu0 xiangfu0 deleted the claude/agitated-feynman-b8a368 branch June 23, 2026 06:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

aggregation Related to aggregation functions and operations merge-rollup Related to merge-rollup task processing minion Related to Pinot Minion task framework

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants