Skip to content

Commit 8f97b26

Browse files
authored
Skip per-row convertTypes() in FunctionOperand when types already match (#17730)
1 parent 2f4c086 commit 8f97b26

2 files changed

Lines changed: 201 additions & 2 deletions

File tree

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.perf;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.pinot.common.function.FunctionInfo;
26+
import org.apache.pinot.common.function.FunctionRegistry;
27+
import org.apache.pinot.common.function.QueryFunctionInvoker;
28+
import org.apache.pinot.common.utils.DataSchema;
29+
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
30+
import org.apache.pinot.query.planner.logical.RexExpression;
31+
import org.apache.pinot.query.runtime.operator.operands.FunctionOperand;
32+
import org.openjdk.jmh.annotations.Benchmark;
33+
import org.openjdk.jmh.annotations.BenchmarkMode;
34+
import org.openjdk.jmh.annotations.Fork;
35+
import org.openjdk.jmh.annotations.Level;
36+
import org.openjdk.jmh.annotations.Measurement;
37+
import org.openjdk.jmh.annotations.Mode;
38+
import org.openjdk.jmh.annotations.OutputTimeUnit;
39+
import org.openjdk.jmh.annotations.Scope;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.annotations.State;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
import org.openjdk.jmh.infra.Blackhole;
44+
import org.openjdk.jmh.runner.Runner;
45+
import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
46+
import org.openjdk.jmh.runner.options.OptionsBuilder;
47+
48+
49+
/**
50+
* Benchmark for {@link FunctionOperand#apply} to measure the impact of skipping
51+
* {@code convertTypes()} when operand types already match function parameter types.
52+
*
53+
* <p>The benchmark has two parts:
54+
* <ul>
55+
* <li><b>End-to-end apply()</b>: Compares FunctionOperand.apply() when types match (convertTypes
56+
* skipped) vs when types mismatch (convertTypes called per row).</li>
57+
* <li><b>Isolated convertTypes()</b>: Directly measures the per-row cost of
58+
* convertTypes() on type-matching arguments — the exact overhead eliminated by the
59+
* optimization — against a no-op baseline.</li>
60+
* </ul>
61+
*/
62+
@BenchmarkMode(Mode.Throughput)
63+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
64+
@Fork(1)
65+
@Warmup(iterations = 3, time = 1)
66+
@Measurement(iterations = 5, time = 1)
67+
@State(Scope.Benchmark)
68+
public class BenchmarkFunctionOperand {
69+
70+
private static final int NUM_ROWS = 1000;
71+
72+
// End-to-end state
73+
private FunctionOperand _matchingTypesOperand;
74+
private FunctionOperand _mismatchedTypesOperand;
75+
private List<List<Object>> _longRows;
76+
private List<List<Object>> _intRows;
77+
78+
// Isolated convertTypes state
79+
private QueryFunctionInvoker _invoker;
80+
private Object[][] _matchingArgs;
81+
82+
@Setup(Level.Trial)
83+
public void setUp() {
84+
FunctionRegistry.init();
85+
86+
// --- End-to-end setup ---
87+
88+
// Scenario 1: plus(LONG, LONG) with LONG columns → types match, no conversion needed
89+
DataSchema longSchema = new DataSchema(
90+
new String[]{"col0", "col1"},
91+
new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.LONG}
92+
);
93+
RexExpression.FunctionCall plusLong = new RexExpression.FunctionCall(
94+
ColumnDataType.LONG, "plus",
95+
Arrays.asList(new RexExpression.InputRef(0), new RexExpression.InputRef(1))
96+
);
97+
_matchingTypesOperand = new FunctionOperand(plusLong, longSchema);
98+
99+
// Scenario 2: plus(INT, INT) → resolves to longPlus(long, long), but argument types are INT
100+
// → types mismatch, convertTypes() called per row
101+
DataSchema intSchema = new DataSchema(
102+
new String[]{"col0", "col1"},
103+
new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.INT}
104+
);
105+
RexExpression.FunctionCall plusInt = new RexExpression.FunctionCall(
106+
ColumnDataType.LONG, "plus",
107+
Arrays.asList(new RexExpression.InputRef(0), new RexExpression.InputRef(1))
108+
);
109+
_mismatchedTypesOperand = new FunctionOperand(plusInt, intSchema);
110+
111+
_longRows = new ArrayList<>(NUM_ROWS);
112+
_intRows = new ArrayList<>(NUM_ROWS);
113+
for (int i = 0; i < NUM_ROWS; i++) {
114+
_longRows.add(Arrays.asList((long) i, (long) (i + 1)));
115+
_intRows.add(Arrays.asList(i, i + 1));
116+
}
117+
118+
// --- Isolated convertTypes setup ---
119+
120+
// Get the longPlus(long, long) function invoker
121+
ColumnDataType[] longArgTypes = {ColumnDataType.LONG, ColumnDataType.LONG};
122+
FunctionInfo functionInfo = FunctionRegistry.lookupFunctionInfo(
123+
FunctionRegistry.canonicalize("plus"), longArgTypes);
124+
_invoker = new QueryFunctionInvoker(functionInfo);
125+
126+
// Pre-allocate argument arrays with Long values (matching type for long parameters).
127+
// convertTypes() will still check isAssignableFrom + HashMap lookup per argument per row.
128+
_matchingArgs = new Object[NUM_ROWS][];
129+
for (int i = 0; i < NUM_ROWS; i++) {
130+
_matchingArgs[i] = new Object[]{(long) i, (long) (i + 1)};
131+
}
132+
}
133+
134+
// --- End-to-end benchmarks ---
135+
136+
@Benchmark
137+
public void applyTypesMatch(Blackhole bh) {
138+
for (List<Object> row : _longRows) {
139+
bh.consume(_matchingTypesOperand.apply(row));
140+
}
141+
}
142+
143+
@Benchmark
144+
public void applyTypesNeedConversion(Blackhole bh) {
145+
for (List<Object> row : _intRows) {
146+
bh.consume(_mismatchedTypesOperand.apply(row));
147+
}
148+
}
149+
150+
// --- Isolated convertTypes benchmarks ---
151+
// These directly measure the per-block overhead of convertTypes() on already-matching types,
152+
// which is the exact cost eliminated by the _needsConversion optimization.
153+
154+
/**
155+
* Baseline: invoke the function on each row without calling convertTypes().
156+
* This represents the optimized path (types match, convertTypes skipped).
157+
*/
158+
@Benchmark
159+
public void invokeWithoutConvertTypes(Blackhole bh) {
160+
for (Object[] args : _matchingArgs) {
161+
bh.consume(_invoker.invoke(args));
162+
}
163+
}
164+
165+
/**
166+
* Old path: call convertTypes() then invoke the function on each row.
167+
* This represents the pre-optimization path (convertTypes always called).
168+
*/
169+
@Benchmark
170+
public void invokeWithConvertTypes(Blackhole bh) {
171+
for (Object[] args : _matchingArgs) {
172+
_invoker.convertTypes(args);
173+
bh.consume(_invoker.invoke(args));
174+
}
175+
}
176+
177+
public static void main(String[] args)
178+
throws Exception {
179+
ChainedOptionsBuilder opt = new OptionsBuilder()
180+
.include(BenchmarkFunctionOperand.class.getSimpleName());
181+
new Runner(opt.build()).run();
182+
}
183+
}

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class FunctionOperand implements TransformOperand {
4141
private final ColumnDataType _resultType;
4242
private final QueryFunctionInvoker _functionInvoker;
4343
private final ColumnDataType _functionInvokerResultType;
44+
private final boolean _needsConversion;
4445
private final List<TransformOperand> _operands;
4546
private final Object[] _reusableOperandHolder;
4647

@@ -78,10 +79,24 @@ public FunctionOperand(RexExpression.FunctionCall functionCall, DataSchema dataS
7879
if (!_functionInvoker.getMethod().isVarArgs()) {
7980
Class<?>[] parameterClasses = _functionInvoker.getParameterClasses();
8081
PinotDataType[] parameterTypes = _functionInvoker.getParameterTypes();
82+
boolean needsConversion = false;
8183
for (int i = 0; i < numOperands; i++) {
8284
Preconditions.checkState(parameterTypes[i] != null, "Unsupported parameter class: %s for method: %s",
8385
parameterClasses[i], functionInfo.getMethod());
86+
if (!needsConversion) {
87+
// For array-typed parameters, always require conversion: the runtime Java class may
88+
// differ from the canonical stored type (e.g. Double[] vs double[] after DataBlock
89+
// deserialization in the multi-stage engine), and Method.invoke does not autobox arrays.
90+
ColumnDataType parameterColumnType = FunctionUtils.getColumnDataType(parameterClasses[i]);
91+
if (parameterColumnType == null || argumentTypes[i] != parameterColumnType
92+
|| parameterClasses[i].isArray()) {
93+
needsConversion = true;
94+
}
95+
}
8496
}
97+
_needsConversion = needsConversion;
98+
} else {
99+
_needsConversion = false;
85100
}
86101
ColumnDataType functionInvokerResultType = FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
87102
// Handle unrecognized result class with STRING
@@ -106,12 +121,13 @@ public Object apply(List<Object> row) {
106121
Object value = operand.apply(row);
107122
_reusableOperandHolder[i] = value != null ? operand.getResultType().toExternal(value) : null;
108123
}
109-
// TODO: Optimize per record conversion
110124
Object result;
111125
if (_functionInvoker.getMethod().isVarArgs()) {
112126
result = _functionInvoker.invoke(new Object[]{_reusableOperandHolder});
113127
} else {
114-
_functionInvoker.convertTypes(_reusableOperandHolder);
128+
if (_needsConversion) {
129+
_functionInvoker.convertTypes(_reusableOperandHolder);
130+
}
115131
result = _functionInvoker.invoke(_reusableOperandHolder);
116132
}
117133
return result != null ? TypeUtils.convert(_functionInvokerResultType.toInternal(result),

0 commit comments

Comments
 (0)