diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java index 5e18a8468c75e..289ae7789a4aa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.relational.it.db.it; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.TableClusterIT; import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; @@ -32,14 +33,17 @@ import org.junit.runner.RunWith; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; +import java.util.List; import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.FULL; import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.INNER; import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.LEFT; import static org.apache.iotdb.commons.queryengine.plan.relational.planner.node.JoinNode.JoinType.RIGHT; import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqual; import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; import static org.junit.Assert.fail; @@ -2466,6 +2470,8 @@ public void outerJoinTest() { @Test public void lastCacheTest() { + prepareStaleLastRowCacheOnSingleDataNode(); + expectedHeader = new String[] { "level", "attr1", "device", "attr2", "_col4", "_col5", "_col6", "_col7", "_col8", "_col9", @@ -2925,6 +2931,59 @@ public static void repeatTest( } } + private static void prepareStaleLastRowCacheOnSingleDataNode() { + try { + final List dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList(); + for (final DataNodeWrapper dataNodeWrapper : dataNodeWrappers) { + executeTableStatementOnSingleDataNode(dataNodeWrapper, "clear query cache on local"); + } + + final DataNodeWrapper pollutedDataNode = dataNodeWrappers.get(0); + + tableResultSetEqualOnSingleDataNode( + pollutedDataNode, + "select last_by(num,time),last_by(bignum,time),last_by(floatnum,time) " + + "from table0 where device='d1' and level='l2' and time<1971-04-26T17:46:40.000", + new String[] {"_col0", "_col1", "_col2"}, + new String[] {"10,3147483648,231.55,"}); + // This only refreshes the cached row time on one DataNode, keeping the field caches stale. + tableResultSetEqualOnSingleDataNode( + pollutedDataNode, + "select last(time),last(device),last(level),last(attr1),last(attr2) " + + "from table0 where device='d1' and level='l2'", + new String[] {"_col0", "_col1", "_col2", "_col3", "_col4"}, + new String[] {"1971-04-26T17:46:40.000Z,d1,l2,yy,zz,"}); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void executeTableStatementOnSingleDataNode( + final DataNodeWrapper dataNodeWrapper, final String sql) throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root", "table"); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } + } + + private static void tableResultSetEqualOnSingleDataNode( + final DataNodeWrapper dataNodeWrapper, + final String sql, + final String[] expectedHeader, + final String[] expectedRetArray) + throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(dataNodeWrapper, "root", "root", "table"); + final Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + try (final ResultSet resultSet = statement.executeQuery(sql)) { + tableResultSetEqual(resultSet, expectedHeader, expectedRetArray); + } + } + } + public static String[] buildHeaders(int length) { String[] expectedHeader = new String[length]; for (int i = 0; i < length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 59fa91dc0a2b6..f8fa0293f29bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1227,7 +1227,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( for (final Map.Entry> measurementLastEntry : device2MeasurementLastEntry.getValue().entrySet()) { final TimeValuePair tvPair = measurementLastEntry.getValue().getRight(); - if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) { + if (tvPair != TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN) { LastQueryUtil.appendLastValueRespectBlob( builder, tvPair.getTimestamp(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java index 77ab84b6a4805..e1c3cb335b095 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java @@ -127,7 +127,7 @@ protected void mayUpdateLastCache( new TimeValuePair[] { Objects.nonNull(value) ? new TimeValuePair(time, value) - : needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null + : needUpdateNullEntry ? TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null }, fullPath.isUnderAlignedEntity(), new IMeasurementSchema[] {fullPath.getMeasurementSchema()}); @@ -139,7 +139,7 @@ protected void mayUpdateLastCache( seriesScanInfo.right = new TimeValuePair(time, value); } else { seriesScanInfo.right = - needUpdateNullEntry ? TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR : null; + needUpdateNullEntry ? TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN : null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java index 72b7c862b2870..92b7a19c6f34f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java @@ -53,8 +53,8 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.iotdb.calc.execution.operator.source.relational.aggregation.Utils.serializeTimeValueWithNull; import static org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE; /** * This class is used to execute aggregation table scan when apply {@code canUseLastCacheOptimize()} @@ -279,7 +279,7 @@ private void buildResultUseLastRowCache() { TsPrimitiveType tsPrimitiveType = lastRowCacheResults.get(currentHitCacheIndex).getRight()[measurementIdx]; long lastByTime = lastRowCacheResults.get(currentHitCacheIndex).getLeft().getAsLong(); - if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) { + if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) { // there is no data for this time series if (aggregator.getStep().isOutputPartial()) { columnBuilder.writeBinary( @@ -322,7 +322,7 @@ private void buildResultUseLastValuesCache() { TsPrimitiveType timeLastValue = currentHitResult[currentHitResult.length - 1].getValue(); // when there is no data, no need to append result if the query is GROUP BY or output of // aggregator is partial (final operator will produce NULL result) - if (timeLastValue == EMPTY_PRIMITIVE_TYPE + if (timeLastValue == PLACEHOLDER_NO_VALUE && (groupingKeySize != 0 || tableAggregators.get(0).getStep().isOutputPartial())) { outputDeviceIndex++; currentHitCacheIndex++; @@ -346,7 +346,7 @@ private void buildResultUseLastValuesCache() { getNthIdColumnValue( cachedDeviceEntries.get(currentHitCacheIndex), aggColumnsIndexArray[columnIdx]); if (aggregator.getAccumulator() instanceof LastDescAccumulator) { - if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) { + if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) { columnBuilder.appendNull(); } else { if (aggregator.getStep().isOutputPartial()) { @@ -368,7 +368,7 @@ private void buildResultUseLastValuesCache() { long lastTime = lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp(); - if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) { + if (timeLastValue == PLACEHOLDER_NO_VALUE || id == null) { if (aggregator.getStep().isOutputPartial()) { columnBuilder.writeBinary( new Binary( @@ -399,7 +399,7 @@ private void buildResultUseLastValuesCache() { cachedDeviceEntries.get(currentHitCacheIndex) .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]]; if (aggregator.getAccumulator() instanceof LastDescAccumulator) { - if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) { + if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) { columnBuilder.appendNull(); } else { if (aggregator.getStep().isOutputPartial()) { @@ -420,7 +420,7 @@ private void buildResultUseLastValuesCache() { lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp(); // last_by - if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) { + if (timeLastValue == PLACEHOLDER_NO_VALUE || attribute == null) { if (aggregator.getStep().isOutputPartial()) { columnBuilder.writeBinary( new Binary( @@ -448,7 +448,7 @@ private void buildResultUseLastValuesCache() { case TIME: if (aggregator.getAccumulator() instanceof LastDescAccumulator) { // for last(time) aggregation - if (timeLastValue == EMPTY_PRIMITIVE_TYPE) { + if (timeLastValue == PLACEHOLDER_NO_VALUE) { columnBuilder.appendNull(); } else { if (aggregator.getStep().isOutputPartial()) { @@ -471,7 +471,7 @@ private void buildResultUseLastValuesCache() { TsPrimitiveType tsPrimitiveType = lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue(); - if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) { + if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) { // there is no data if (aggregator.getStep().isOutputPartial()) { columnBuilder.writeBinary( @@ -509,7 +509,7 @@ private void buildResultUseLastValuesCache() { TsPrimitiveType tsPrimitiveType = lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue(); - if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) { + if (tsPrimitiveType == PLACEHOLDER_NO_VALUE) { // there is no data for this time series columnBuilder.appendNull(); } else { @@ -559,7 +559,7 @@ private void updateLastCacheUseLastRowIfPossible() { new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime()))); } else { currentDeviceEntry = deviceEntries.get(currentDeviceIndex); - updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR); + updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN); } } else { LastByDescAccumulator lastByAccumulator = @@ -576,8 +576,7 @@ private void updateLastCacheUseLastRowIfPossible() { case FIELD: LastByDescAccumulator lastByAccumulator = (LastByDescAccumulator) tableAggregator.getAccumulator(); - updateMeasurementList.add(schema.getName()); - if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) { + if (lastByAccumulator.hasInitResult()) { long lastByTime = lastByAccumulator.getLastTimeOfY(); if (!hasSetLastTime) { @@ -587,11 +586,15 @@ private void updateLastCacheUseLastRowIfPossible() { new TimeValuePair(lastByTime, new TsPrimitiveType.TsLong(lastByTime))); } + updateMeasurementList.add(schema.getName()); updateTimeValuePairList.add( - new TimeValuePair( - lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult()))); + lastByAccumulator.isXNull() + ? new TimeValuePair(lastByTime, PLACEHOLDER_NO_VALUE) + : new TimeValuePair( + lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult()))); } else { - updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR); + updateMeasurementList.add(schema.getName()); + updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN); } break; default: @@ -628,7 +631,7 @@ private void updateLastCacheUseLastValuesIfPossible() { new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime()))); } else { currentDeviceEntry = deviceEntries.get(currentDeviceIndex); - updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR); + updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN); } } break; @@ -645,7 +648,7 @@ private void updateLastCacheUseLastValuesIfPossible() { lastAccumulator.getMaxTime(), cloneTsPrimitiveType(lastAccumulator.getLastValue()))); } else { - updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR); + updateTimeValuePairList.add(PLACEHOLDER_EMPTY_COLUMN); } break; default: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 06de6a11b82d5..903a361f3bd99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -199,7 +199,8 @@ import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.isFilterGtOrGe; import static org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_NO_VALUE; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.PLACEHOLDER_STALE_VALUE; import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; public class DataNodeTableOperatorGenerator @@ -1624,6 +1625,9 @@ private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator( for (int j = 0; j < lastByResult.get().getRight().length; j++) { TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j]; if (tsPrimitiveType == null + // Known-null at the aligned row time can still hit cache. Only miss or stale target + // values need to fall back to scan for correctness. + || tsPrimitiveType == PLACEHOLDER_STALE_VALUE || (updateTimeFilter != null && !LastQueryUtil.satisfyFilter( updateTimeFilter, @@ -1723,7 +1727,7 @@ private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator( parameter.getSeriesScanOptions().getGlobalTimeFilter(), timeValuePair)) { if (isFilterGtOrGe(updateTimeFilter)) { // it means there is no data meets Filter - timeValuePair.setValue(EMPTY_PRIMITIVE_TYPE); + timeValuePair.setValue(PLACEHOLDER_NO_VALUE); } else { allHitCache = false; break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 2a5af16e5b36c..5101881d92e14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2923,7 +2923,7 @@ public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanCon if (timeValuePair == null) { // last value is not cached unCachedMeasurementIndexes.add(i); - } else if (timeValuePair.getValue() == TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE) { + } else if (timeValuePair.getValue() == TableDeviceLastCache.PLACEHOLDER_NO_VALUE) { // there is no data for this time series, just ignore } else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) { // cached last value is not satisfied diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java index 5a2ac3d1e5329..271051b72b831 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java @@ -21,7 +21,6 @@ import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; -import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; @@ -46,53 +45,36 @@ public class TableDeviceLastCache { static final int INSTANCE_SIZE = (int) RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class) + (int) RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class); + private static final int LONG_INSTANCE_SIZE = + (int) RamUsageEstimator.shallowSizeOfInstance(Long.class); - public static final TsPrimitiveType EMPTY_PRIMITIVE_TYPE = - new TsPrimitiveType() { - @Override - public void setObject(Object o) { - // Do nothing - } - - @Override - public void reset() { - // Do nothing - } - - @Override - public int getSize() { - return 0; - } - - @Override - public Object getValue() { - return null; - } - - @Override - public String getStringValue() { - return null; - } + /** + * Cache hit and the measurement is known to be null at the aligned last-row time. For stored + * entries, it is only used as the value part of the time column's cached {@link TimeValuePair}. + */ + public static final TsPrimitiveType PLACEHOLDER_NO_VALUE = new TsPrimitiveType.TsInt(); - @Override - public TSDataType getDataType() { - return null; - } - }; + /** + * Cache hit but the target measurement is stale under a newer aligned last-row time. This + * sentinel is only returned by {@link #getLastRow(String, List)} and is never stored in cache. + */ + public static final TsPrimitiveType PLACEHOLDER_STALE_VALUE = new TsPrimitiveType.TsInt(); private static final Optional> HIT_AND_ALL_NULL = Optional.of(new Pair<>(OptionalLong.empty(), null)); - /** This means that the tv pair has been put, and the value is null */ - public static final TimeValuePair EMPTY_TIME_VALUE_PAIR = - new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE); + /** This means the measurement has been cached and is known to have no values at all. */ + public static final TimeValuePair PLACEHOLDER_EMPTY_COLUMN = + new TimeValuePair(Long.MIN_VALUE, PLACEHOLDER_NO_VALUE); /** This means that the tv pair has been declared, and is ready for the next put. */ - private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR = - new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE); + private static final TimeValuePair PLACEHOLDER_NO_CACHE = + new TimeValuePair(Long.MIN_VALUE, PLACEHOLDER_NO_VALUE); // Time is seen as "" as a measurement private final Map measurement2CachedLastMap = new ConcurrentHashMap<>(); + private final Map measurement2CachedLastKnownNullTimeMap = + new ConcurrentHashMap<>(); private final boolean isTableModel; TableDeviceLastCache(final boolean isTableModel) { @@ -117,7 +99,10 @@ int initOrInvalidate( if (Objects.isNull(finalMeasurement)) { continue; } - final TimeValuePair newPair = isInvalidate ? null : PLACEHOLDER_TIME_VALUE_PAIR; + if (isInvalidate && measurement2CachedLastKnownNullTimeMap.remove(finalMeasurement) != null) { + diff.addAndGet(-getKnownNullTimeEntrySize()); + } + final TimeValuePair newPair = isInvalidate ? null : PLACEHOLDER_NO_CACHE; measurement2CachedLastMap.compute( finalMeasurement, @@ -155,6 +140,7 @@ int tryUpdate( for (int i = 0; i < measurements.length; ++i) { if (Objects.isNull(timeValuePairs[i])) { if (invalidateNull) { + diff.addAndGet(removeKnownNullTime(measurements[i])); diff.addAndGet( -((int) RamUsageEstimator.sizeOf(measurements[i]) + getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i])))); @@ -162,6 +148,14 @@ int tryUpdate( continue; } + if (isKnownNullAtAlignedTime(measurements[i], timeValuePairs[i])) { + if (lastTime < timeValuePairs[i].getTimestamp()) { + lastTime = timeValuePairs[i].getTimestamp(); + } + diff.addAndGet(tryUpdateKnownNullTime(measurements[i], timeValuePairs[i].getTimestamp())); + continue; + } + final int finalI = i; if (lastTime < timeValuePairs[i].getTimestamp()) { lastTime = timeValuePairs[i].getTimestamp(); @@ -170,7 +164,10 @@ int tryUpdate( measurements[i], (measurement, tvPair) -> { if (tvPair.getTimestamp() <= timeValuePairs[finalI].getTimestamp()) { - diff.addAndGet(getDiffSize(tvPair, timeValuePairs[finalI])); + diff.addAndGet( + getDiffSize(tvPair, timeValuePairs[finalI]) + + clearKnownNullTimeIfCovered( + measurement, timeValuePairs[finalI].getTimestamp())); return timeValuePairs[finalI]; } return tvPair; @@ -181,7 +178,7 @@ int tryUpdate( "", (time, tvPair) -> tvPair.getTimestamp() < finalLastTime - ? new TimeValuePair(finalLastTime, EMPTY_PRIMITIVE_TYPE) + ? new TimeValuePair(finalLastTime, PLACEHOLDER_NO_VALUE) : tvPair); return diff.get(); } @@ -190,6 +187,7 @@ int tryUpdate( int invalidate(final String measurement) { final AtomicInteger diff = new AtomicInteger(); final AtomicLong time = new AtomicLong(); + final AtomicLong knownNullTime = new AtomicLong(Long.MIN_VALUE); measurement2CachedLastMap.computeIfPresent( measurement, (s, timeValuePair) -> { @@ -199,13 +197,18 @@ int invalidate(final String measurement) { time.set(timeValuePair.getTimestamp()); return null; }); + final Long removedKnownNullTime = measurement2CachedLastKnownNullTimeMap.remove(measurement); + if (removedKnownNullTime != null) { + diff.addAndGet(-getKnownNullTimeEntrySize()); + knownNullTime.set(removedKnownNullTime); + } if (diff.get() == 0) { return 0; } measurement2CachedLastMap.computeIfPresent( "", (s, timeValuePair) -> { - if (timeValuePair.getTimestamp() <= time.get()) { + if (timeValuePair.getTimestamp() <= Math.max(time.get(), knownNullTime.get())) { diff.addAndGet((int) RamUsageEstimator.sizeOf(s) + getTvPairEntrySize(timeValuePair)); return null; } @@ -225,25 +228,32 @@ private static int getTvPairSize(final TimeValuePair tvPair) { private static boolean isEmptyTvPair(final TimeValuePair tvPair) { return Objects.isNull(tvPair) - || tvPair == PLACEHOLDER_TIME_VALUE_PAIR - || tvPair == EMPTY_TIME_VALUE_PAIR; + || tvPair == PLACEHOLDER_NO_CACHE + || tvPair == PLACEHOLDER_EMPTY_COLUMN; + } + + private static boolean isKnownNullAtAlignedTime( + final @Nonnull String measurement, final @Nonnull TimeValuePair timeValuePair) { + return !measurement.isEmpty() + && timeValuePair != PLACEHOLDER_EMPTY_COLUMN + && timeValuePair.getValue() == PLACEHOLDER_NO_VALUE; } @Nullable TimeValuePair getTimeValuePair(final @Nonnull String measurement) { final TimeValuePair result = measurement2CachedLastMap.get(measurement); - return result != PLACEHOLDER_TIME_VALUE_PAIR ? result : null; + return result != PLACEHOLDER_NO_CACHE ? result : null; } // Shall pass in "" if last by time Optional> getLastRow( final @Nonnull String sourceMeasurement, final List targetMeasurements) { final TimeValuePair pair = measurement2CachedLastMap.get(sourceMeasurement); - if (Objects.isNull(pair) || pair == PLACEHOLDER_TIME_VALUE_PAIR) { + if (Objects.isNull(pair) || pair == PLACEHOLDER_NO_CACHE) { return Optional.empty(); } - if (pair == EMPTY_TIME_VALUE_PAIR) { + if (pair == PLACEHOLDER_EMPTY_COLUMN) { return HIT_AND_ALL_NULL; } final long alignTime = pair.getTimestamp(); @@ -255,14 +265,10 @@ Optional> getLastRow( .map( targetMeasurement -> { if (!targetMeasurement.isEmpty()) { - final TimeValuePair tvPair = - measurement2CachedLastMap.get(targetMeasurement); - if (Objects.isNull(tvPair)) { - return null; - } - return tvPair.getTimestamp() == alignTime - ? tvPair.getValue() - : EMPTY_PRIMITIVE_TYPE; + return getLastRowTargetValue( + alignTime, + measurement2CachedLastMap.get(targetMeasurement), + measurement2CachedLastKnownNullTimeMap.get(targetMeasurement)); } else { return new TsPrimitiveType.TsLong(alignTime); } @@ -270,15 +276,77 @@ Optional> getLastRow( .toArray(TsPrimitiveType[]::new))); } + @Nullable + private static TsPrimitiveType getLastRowTargetValue( + final long alignTime, + final @Nullable TimeValuePair tvPair, + final @Nullable Long knownNullTime) { + if (knownNullTime != null && knownNullTime == alignTime) { + return PLACEHOLDER_NO_VALUE; + } + if (Objects.isNull(tvPair) || tvPair == PLACEHOLDER_NO_CACHE) { + return null; + } + if (tvPair == PLACEHOLDER_EMPTY_COLUMN) { + return PLACEHOLDER_NO_VALUE; + } + return tvPair.getTimestamp() == alignTime ? tvPair.getValue() : PLACEHOLDER_STALE_VALUE; + } + int estimateSize() { return INSTANCE_SIZE + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * measurement2CachedLastMap.size() + + (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + * measurement2CachedLastKnownNullTimeMap.size() + measurement2CachedLastMap.entrySet().stream() .mapToInt( entry -> (isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(entry.getKey())) + TableDeviceLastCache.getTvPairSize(entry.getValue())) - .reduce(0, Integer::sum); + .reduce(0, Integer::sum) + + measurement2CachedLastKnownNullTimeMap.size() * LONG_INSTANCE_SIZE; + } + + private int tryUpdateKnownNullTime(final @Nonnull String measurement, final long knownNullTime) { + final AtomicInteger diff = new AtomicInteger(0); + measurement2CachedLastMap.computeIfPresent( + measurement, + (measurementName, tvPair) -> { + measurement2CachedLastKnownNullTimeMap.compute( + measurementName, + (ignored, oldTime) -> { + if (oldTime == null) { + diff.addAndGet(getKnownNullTimeEntrySize()); + return knownNullTime; + } + return oldTime < knownNullTime ? knownNullTime : oldTime; + }); + return tvPair; + }); + return diff.get(); + } + + private int clearKnownNullTimeIfCovered( + final @Nonnull String measurement, final long coveredTime) { + if (measurement.isEmpty()) { + return 0; + } + final Long knownNullTime = measurement2CachedLastKnownNullTimeMap.get(measurement); + if (knownNullTime != null && knownNullTime <= coveredTime) { + measurement2CachedLastKnownNullTimeMap.remove(measurement); + return -getKnownNullTimeEntrySize(); + } + return 0; + } + + private int removeKnownNullTime(final @Nonnull String measurement) { + return measurement2CachedLastKnownNullTimeMap.remove(measurement) == null + ? 0 + : -getKnownNullTimeEntrySize(); + } + + private static int getKnownNullTimeEntrySize() { + return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + LONG_INSTANCE_SIZE; } private static int getDiffSize( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index eed26dadc2957..f49d67073774c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -211,10 +211,12 @@ public void invalidateAttributes(final String database, final IDeviceID deviceId * *

- Second time put the calculated {@link TimeValuePair}s, and use {@link * #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[])}. The input {@link - * TimeValuePair}s shall never be or contain {@code null}, if a measurement is with all {@code - * null}s, its {@link TimeValuePair} shall be {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. - * For time column, the input measurement shall be "", and the value shall be {@link - * TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE}. If the time column is not explicitly specified, the + * TimeValuePair}s shall never be or contain {@code null}. If a measurement is with all {@code + * null}s, its {@link TimeValuePair} shall be {@link + * TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN}; if it is known to be {@code null} at a concrete + * last-row time, preserve that time and use {@link TableDeviceLastCache#PLACEHOLDER_NO_VALUE} as + * the value. For time column, the input measurement shall be "", and the value shall be {@link + * TableDeviceLastCache#PLACEHOLDER_NO_VALUE}. If the time column is not explicitly specified, the * device's last time won't be updated because we cannot guarantee the completeness of the * existing measurements in cache. * @@ -304,8 +306,8 @@ public void updateLastCacheIfExists( * @param database the device's database, without "root", {@code null} for tree model * @param deviceId {@link IDeviceID} * @param measurement the measurement to get - * @return {@code null} iff cache miss, {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR} iff - * cache hit but result is {@code null}, and the result value otherwise. + * @return {@code null} iff cache miss, {@link TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN} iff + * cache hit but the measurement has no values at all, and the result value otherwise. */ public TimeValuePair getLastEntry( final @Nullable String database, final IDeviceID deviceId, final String measurement) { @@ -321,8 +323,8 @@ public TimeValuePair getLastEntry( * @param database the device's database, without "root", {@code null} for tree model * @param deviceId {@link IDeviceID} * @param measurements the measurements to get - * @return {@code null} iff cache miss, {@link TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR} iff - * cache hit but result is {@code null}, and the result value otherwise. + * @return {@code null} iff cache miss, {@link TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN} iff + * cache hit but the measurement has no values at all, and the result value otherwise. */ public TimeValuePair[] getLastEntries( final @Nullable String database, final IDeviceID deviceId, final String[] measurements) { @@ -345,8 +347,10 @@ public TimeValuePair[] getLastEntries( * the {@link Pair#left} will be the source measurement's last time, (OptionalLong.empty() iff * the source measurement is all {@code null}); {@link Pair#right} will be an {@link * TsPrimitiveType} array, whose element will be {@code null} if cache miss, {@link - * TableDeviceLastCache#EMPTY_PRIMITIVE_TYPE} iff cache hit and the measurement is without any - * values when last by the source measurement's time, and the result value otherwise. + * TableDeviceLastCache#PLACEHOLDER_NO_VALUE} iff cache hit and the measurement is known to be + * {@code null} when last by the source measurement's time, {@link + * TableDeviceLastCache#PLACEHOLDER_STALE_VALUE} iff cache hit but the target measurement is + * stale under a newer source time, and the result value otherwise. */ public Optional> getLastRow( final String database, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java index cdb66fdc04744..93d75aeff2d8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java @@ -355,7 +355,8 @@ public void updateLastCacheIfExists( * #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], boolean, * IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or contain {@code null}, * if the measurement is with all {@code null}s, its {@link TimeValuePair} shall be {@link - * TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to update time column. + * TableDeviceLastCache#PLACEHOLDER_EMPTY_COLUMN}. This method is not supposed to update time + * column. * * @param database the device's database, WITH "root" * @param measurementPath the fetched {@link MeasurementPath} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java new file mode 100644 index 0000000000000..eedbbfbb4ec27 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCacheTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache; + +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalLong; + +public class TableDeviceLastCacheTest { + + @Test + public void testKnownNullTimePreservesHistoricalValueAndClearsOnNewerValue() { + final TableDeviceLastCache cache = new TableDeviceLastCache(false); + + cache.initOrInvalidate(null, null, new String[] {"", "s1"}, false); + + final TimeValuePair historicalValue = new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)); + cache.tryUpdate( + new String[] {"", "s1"}, + new TimeValuePair[] { + new TimeValuePair(1L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE), historicalValue + }); + cache.tryUpdate( + new String[] {"s1"}, + new TimeValuePair[] {new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE)}); + + Assert.assertEquals(historicalValue, cache.getTimeValuePair("s1")); + Optional> result = + cache.getLastRow("", Collections.singletonList("s1")); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft()); + Assert.assertArrayEquals( + new TsPrimitiveType[] {TableDeviceLastCache.PLACEHOLDER_NO_VALUE}, result.get().getRight()); + + final TimeValuePair newerValue = new TimeValuePair(3L, new TsPrimitiveType.TsInt(3)); + cache.tryUpdate(new String[] {"s1"}, new TimeValuePair[] {newerValue}); + + Assert.assertEquals(newerValue, cache.getTimeValuePair("s1")); + result = cache.getLastRow("", Collections.singletonList("s1")); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(OptionalLong.of(3L), result.get().getLeft()); + Assert.assertArrayEquals( + new TsPrimitiveType[] {new TsPrimitiveType.TsInt(3)}, result.get().getRight()); + } + + @Test + public void testInvalidateMeasurementClearsKnownNullTimeAndAlignedTime() { + final TableDeviceLastCache cache = new TableDeviceLastCache(false); + + cache.initOrInvalidate(null, null, new String[] {"", "s1"}, false); + cache.tryUpdate( + new String[] {"", "s1"}, + new TimeValuePair[] { + new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE), + new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE) + }); + + Assert.assertTrue(cache.getLastRow("", Collections.singletonList("s1")).isPresent()); + Assert.assertNotNull(cache.getTimeValuePair("")); + + cache.invalidate("s1"); + + Assert.assertFalse(cache.getLastRow("", Collections.singletonList("s1")).isPresent()); + Assert.assertNull(cache.getTimeValuePair("")); + Assert.assertNull(cache.getTimeValuePair("s1")); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java index cde0ffbb13fd0..e808652a42a9c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java @@ -334,10 +334,10 @@ public void testLastCache() { database1, convertTagValuesToDeviceID(table1, device0), new String[] {"s4"}, - new TimeValuePair[] {TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR}); + new TimeValuePair[] {TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN}); Assert.assertSame( - TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR, + TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN, cache.getLastEntry(database1, convertTagValuesToDeviceID(table1, device0), "s4")); // Test null miss measurements @@ -358,19 +358,52 @@ public void testLastCache() { database1, convertTagValuesToDeviceID(table1, device0), new String[] {""}, - new TimeValuePair[] {new TimeValuePair(2L, TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE)}); + new TimeValuePair[] {new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE)}); + + updateLastCache4Query( + cache, + database1, + convertTagValuesToDeviceID(table1, device0), + new String[] {"s1"}, + new TimeValuePair[] { + new TimeValuePair(2L, TableDeviceLastCache.PLACEHOLDER_NO_VALUE), + }); + + Assert.assertEquals( + tv3, cache.getLastEntry(database1, convertTagValuesToDeviceID(table1, device0), "s1")); + + result = + cache.getLastRow( + database1, convertTagValuesToDeviceID(table1, device0), "", Arrays.asList("s2", "s1")); + Assert.assertTrue(result.isPresent()); + Assert.assertTrue(result.get().getLeft().isPresent()); + Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft()); + Assert.assertArrayEquals( + new TsPrimitiveType[] { + new TsPrimitiveType.TsInt(2), TableDeviceLastCache.PLACEHOLDER_NO_VALUE, + }, + result.get().getRight()); + + cache.initOrInvalidateLastCache( + database1, convertTagValuesToDeviceID(table1, device0), new String[] {"s5"}, false); result = cache.getLastRow( database1, convertTagValuesToDeviceID(table1, device0), - "", - Collections.singletonList("s2")); + "s2", + Arrays.asList("s0", "s1", "s4", "s5")); Assert.assertTrue(result.isPresent()); Assert.assertTrue(result.get().getLeft().isPresent()); Assert.assertEquals(OptionalLong.of(2L), result.get().getLeft()); Assert.assertArrayEquals( - new TsPrimitiveType[] {new TsPrimitiveType.TsInt(2)}, result.get().getRight()); + new TsPrimitiveType[] { + TableDeviceLastCache.PLACEHOLDER_STALE_VALUE, + TableDeviceLastCache.PLACEHOLDER_NO_VALUE, + TableDeviceLastCache.PLACEHOLDER_NO_VALUE, + null + }, + result.get().getRight()); result = cache.getLastRow( @@ -386,7 +419,7 @@ public void testLastCache() { new TsPrimitiveType.TsInt(3), new TsPrimitiveType.TsLong(1), new TsPrimitiveType.TsInt(3), - TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, + TableDeviceLastCache.PLACEHOLDER_NO_VALUE, null }, result.get().getRight()); @@ -460,8 +493,8 @@ public void testLastCache() { convertTagValuesToDeviceID(table2, device0), new String[] {"", "s2"}, new TimeValuePair[] { - new TimeValuePair(Long.MIN_VALUE, TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE), - TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR + new TimeValuePair(Long.MIN_VALUE, TableDeviceLastCache.PLACEHOLDER_NO_VALUE), + TableDeviceLastCache.PLACEHOLDER_EMPTY_COLUMN }); result = @@ -471,7 +504,7 @@ public void testLastCache() { Assert.assertTrue(result.get().getLeft().isPresent()); Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE), result.get().getLeft()); Assert.assertArrayEquals( - new TsPrimitiveType[] {TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, null}, + new TsPrimitiveType[] {TableDeviceLastCache.PLACEHOLDER_NO_VALUE, null}, result.get().getRight()); updateLastCache4Query( @@ -492,7 +525,7 @@ public void testLastCache() { Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE), result.get().getLeft()); Assert.assertArrayEquals( new TsPrimitiveType[] { - TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new TsPrimitiveType.TsInt(3), + TableDeviceLastCache.PLACEHOLDER_NO_VALUE, new TsPrimitiveType.TsInt(3), }, result.get().getRight()); @@ -504,7 +537,7 @@ public void testLastCache() { Assert.assertEquals(OptionalLong.of(Long.MIN_VALUE), result.get().getLeft()); Assert.assertArrayEquals( new TsPrimitiveType[] { - TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new TsPrimitiveType.TsInt(3), + TableDeviceLastCache.PLACEHOLDER_NO_VALUE, new TsPrimitiveType.TsInt(3), }, result.get().getRight());