Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.relational.it.db.it;

import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.TableClusterIT;
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
Expand All @@ -32,14 +33,17 @@
import org.junit.runner.RunWith;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;

import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL;
import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER;
import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.LEFT;
import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.RIGHT;
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqual;
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -2466,6 +2470,8 @@ public void outerJoinTest() {

@Test
public void lastCacheTest() {
prepareStaleLastRowCacheOnSingleDataNode();

expectedHeader =
new String[] {
"level", "attr1", "device", "attr2", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9",
Expand Down Expand Up @@ -2925,6 +2931,59 @@ public static void repeatTest(
}
}

private static void prepareStaleLastRowCacheOnSingleDataNode() {
try {
final List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
for (final DataNodeWrapper dataNodeWrapper : dataNodeWrappers) {
executeTableStatementOnSingleDataNode(dataNodeWrapper, "clear query cache on local");
}

final DataNodeWrapper pollutedDataNode = dataNodeWrappers.get(0);

tableResultSetEqualOnSingleDataNode(
pollutedDataNode,
"select last_by(num,time),last_by(bignum,time),last_by(floatnum,time) "
+ "from table0 where device='d1' and level='l2' and time<1971-04-26T17:46:40.000",
new String[] {"_col0", "_col1", "_col2"},
new String[] {"10,3147483648,231.55,"});
// This only refreshes the cached row time on one DataNode, keeping the field caches stale.
tableResultSetEqualOnSingleDataNode(
pollutedDataNode,
"select last(time),last(device),last(level),last(attr1),last(attr2) "
+ "from table0 where device='d1' and level='l2'",
new String[] {"_col0", "_col1", "_col2", "_col3", "_col4"},
new String[] {"1971-04-26T17:46:40.000Z,d1,l2,yy,zz,"});
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

private static void executeTableStatementOnSingleDataNode(
final DataNodeWrapper dataNodeWrapper, final String sql) throws Exception {
try (final Connection connection =
EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root", "table");
final Statement statement = connection.createStatement()) {
statement.execute(sql);
}
}

private static void tableResultSetEqualOnSingleDataNode(
final DataNodeWrapper dataNodeWrapper,
final String sql,
final String[] expectedHeader,
final String[] expectedRetArray)
throws Exception {
try (final Connection connection =
EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root", "table");
final Statement statement = connection.createStatement()) {
statement.execute("use " + DATABASE_NAME);
try (final ResultSet resultSet = statement.executeQuery(sql)) {
tableResultSetEqual(resultSet, expectedHeader, expectedRetArray);
}
}
}

public static String[] buildHeaders(int length) {
String[] expectedHeader = new String[length];
for (int i = 0; i < length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
for (final Map.Entry<String, Pair<TSDataType, TimeValuePair>> measurementLastEntry :
device2MeasurementLastEntry.getValue().entrySet()) {
final TimeValuePair tvPair = measurementLastEntry.getValue().getRight();
if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
if (tvPair != TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN) {
LastQueryUtil.appendLastValueRespectBlob(
builder,
tvPair.getTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void mayUpdateLastCache(
new TimeValuePair[] {
Objects.nonNull(value)
? new TimeValuePair(time, value)
: needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null
: needUpdateNullEntry ? TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null
},
fullPath.isUnderAlignedEntity(),
new IMeasurementSchema[] {fullPath.getMeasurementSchema()});
Expand All @@ -139,7 +139,7 @@ protected void mayUpdateLastCache(
seriesScanInfo.right = new TimeValuePair(time, value);
} else {
seriesScanInfo.right =
needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null;
needUpdateNullEntry ? TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Utils.serializeTimeValueWithNull;
import static org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE;

/**
* This class is used to execute aggregation table scan when apply {@code canUseLastCacheOptimize()}
Expand Down Expand Up @@ -174,7 +174,7 @@
}
}

private void buildResultUseLastRowCache() {

Check warning on line 177 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 136 to 64, Complexity from 18 to 14, Nesting Level from 4 to 2, Number of Variables from 14 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4pT4UEKr1Tka8MwYnn&open=AZ4pT4UEKr1Tka8MwYnn&pullRequest=17677
appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
Pair<OptionalLong, TsPrimitiveType[]> currentHitResult =
lastRowCacheResults.get(currentHitCacheIndex);
Expand Down Expand Up @@ -279,7 +279,7 @@
TsPrimitiveType tsPrimitiveType =
lastRowCacheResults.get(currentHitCacheIndex).getRight()[measurementIdx];
long lastByTime = lastRowCacheResults.get(currentHitCacheIndex).getLeft().getAsLong();
if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data for this time series
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
Expand Down Expand Up @@ -322,7 +322,7 @@
TsPrimitiveType timeLastValue = currentHitResult[currentHitResult.length - 1].getValue();
// when there is no data, no need to append result if the query is GROUP BY or output of
// aggregator is partial (final operator will produce NULL result)
if (timeLastValue == EMPTY_PRIMITIVE_TYPE
if (timeLastValue == PLACEHOLDER_NO_VALUE
&& (groupingKeySize != 0 || tableAggregators.get(0).getStep().isOutputPartial())) {
outputDeviceIndex++;
currentHitCacheIndex++;
Expand All @@ -346,7 +346,7 @@
getNthIdColumnValue(
cachedDeviceEntries.get(currentHitCacheIndex), aggColumnsIndexArray[columnIdx]);
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
Expand All @@ -368,7 +368,7 @@
long lastTime =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();

if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) {
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
new Binary(
Expand Down Expand Up @@ -399,7 +399,7 @@
cachedDeviceEntries.get(currentHitCacheIndex)
.getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
Expand All @@ -420,7 +420,7 @@
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();

// last_by
if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) {
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
new Binary(
Expand Down Expand Up @@ -448,7 +448,7 @@
case TIME:
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
// for last(time) aggregation
if (timeLastValue == EMPTY_PRIMITIVE_TYPE) {
if (timeLastValue == PLACEHOLDER_NO_VALUE) {
columnBuilder.appendNull();
} else {
if (aggregator.getStep().isOutputPartial()) {
Expand All @@ -471,7 +471,7 @@
TsPrimitiveType tsPrimitiveType =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();

if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data
if (aggregator.getStep().isOutputPartial()) {
columnBuilder.writeBinary(
Expand Down Expand Up @@ -509,7 +509,7 @@
TsPrimitiveType tsPrimitiveType =
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();

if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) {
// there is no data for this time series
columnBuilder.appendNull();
} else {
Expand Down Expand Up @@ -559,7 +559,7 @@
new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
} else {
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
} else {
LastByDescAccumulator lastByAccumulator =
Expand All @@ -576,8 +576,7 @@
case FIELD:
LastByDescAccumulator lastByAccumulator =
(LastByDescAccumulator) tableAggregator.getAccumulator();
updateMeasurementList.add(schema.getName());
if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) {
if (lastByAccumulator.hasInitResult()) {
long lastByTime = lastByAccumulator.getLastTimeOfY();

if (!hasSetLastTime) {
Expand All @@ -587,11 +586,15 @@
new TimeValuePair(lastByTime, new TsPrimitiveType.TsLong(lastByTime)));
}

updateMeasurementList.add(schema.getName());
updateTimeValuePairList.add(
new TimeValuePair(
lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult())));
lastByAccumulator.isXNull()
? new TimeValuePair(lastByTime, PLACEHOLDER_NO_VALUE)
: new TimeValuePair(
lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult())));
} else {
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
updateMeasurementList.add(schema.getName());
updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
break;
default:
Expand Down Expand Up @@ -628,7 +631,7 @@
new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
} else {
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
}
break;
Expand All @@ -645,7 +648,7 @@
lastAccumulator.getMaxTime(),
cloneTsPrimitiveType(lastAccumulator.getLastValue())));
} else {
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@
import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.isFilterGtOrGe;
import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE;
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_STALE_VALUE;
import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP;

public class DataNodeTableOperatorGenerator
Expand Down Expand Up @@ -516,7 +517,7 @@
}
}

private void calculateSeriesScanOptionsList() {

Check warning on line 520 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 88 to 64, Complexity from 25 to 14, Nesting Level from 3 to 2, Number of Variables from 18 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nUYBVy7NuH8hrjkKK&open=AZ4nUYBVy7NuH8hrjkKK&pullRequest=17677
if (seriesScanOptionsList != null) {
return;
}
Expand Down Expand Up @@ -1353,7 +1354,7 @@
}

private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
constructAbstractAggTableScanOperatorParameter(

Check warning on line 1357 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 177 to 64, Complexity from 21 to 14, Nesting Level from 4 to 2, Number of Variables from 40 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nUYBVy7NuH8hrjkKL&open=AZ4nUYBVy7NuH8hrjkKL&pullRequest=17677
AggregationTableScanNode node,
LocalExecutionPlanContext context,
String className,
Expand Down Expand Up @@ -1624,6 +1625,9 @@
for (int j = 0; j < lastByResult.get().getRight().length; j++) {
TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
if (tsPrimitiveType == null
// Known-null at the aligned row time can still hit cache. Only miss or stale target
// values need to fall back to scan for correctness.
|| tsPrimitiveType == PLACEHOLDER_STALE_VALUE
|| (updateTimeFilter != null
&& !LastQueryUtil.satisfyFilter(
updateTimeFilter,
Expand Down Expand Up @@ -1723,7 +1727,7 @@
parameter.getSeriesScanOptions().getGlobalTimeFilter(), timeValuePair)) {
if (isFilterGtOrGe(updateTimeFilter)) {
// it means there is no data meets Filter
timeValuePair.setValue(EMPTY_PRIMITIVE_TYPE);
timeValuePair.setValue(PLACEHOLDER_NO_VALUE);
} else {
allHitCache = false;
break;
Expand Down Expand Up @@ -1964,7 +1968,7 @@
timeColumnOfTargetTable);
}

private boolean[] checkStatisticAndScanOrder(

Check warning on line 1971 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 71 to 64, Complexity from 33 to 14, Nesting Level from 4 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nUYBVy7NuH8hrjkKM&open=AZ4nUYBVy7NuH8hrjkKM&pullRequest=17677
AggregationTableScanNode node, String timeColumnName) {
boolean canUseStatistic = true;
int ascendingCount = 0, descendingCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2894,7 +2894,7 @@
}

@Override
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {

Check warning on line 2897 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 83 to 64, Complexity from 17 to 14, Nesting Level from 4 to 2, Number of Variables from 20 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4p-YmWW4Q6haNClFtW&open=AZ4p-YmWW4Q6haNClFtW&pullRequest=17677
final PartialPath devicePath = node.getDevicePath();
List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
Expand Down Expand Up @@ -2923,7 +2923,7 @@

if (timeValuePair == null) { // last value is not cached
unCachedMeasurementIndexes.add(i);
} else if (timeValuePair.getValue() == TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
} else if (timeValuePair.getValue() == TableDeviceLastCache.PLACEHOLDER_NO_VALUE) {
// there is no data for this time series, just ignore
} else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
// cached last value is not satisfied
Expand Down Expand Up @@ -3248,7 +3248,7 @@
node.getScope()));
}

public List<Operator> dealWithConsumeAllChildrenPipelineBreaker(

Check warning on line 3251 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 103 to 64, Complexity from 19 to 14, Nesting Level from 5 to 2, Number of Variables from 29 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4p-YmWW4Q6haNClFtV&open=AZ4p-YmWW4Q6haNClFtV&pullRequest=17677
PlanNode node, LocalExecutionPlanContext context) {
// children after pipelining
List<Operator> parentPipelineChildren = new ArrayList<>();
Expand Down
Loading
Loading