diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java index 5de040eeba28..2e9a2dd48713 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java @@ -75,7 +75,6 @@ public TSStatus visitRelationalInsertRows(RelationalInsertRowsNode node, DataReg public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) { try { dataRegion.insert(node); - dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; } catch (OutOfTTLException e) { LOGGER.warn("Error in executing plan node: {}, caused by {}", node, e.getMessage()); @@ -99,7 +98,6 @@ public TSStatus visitRelationalInsertTablet( public TSStatus visitInsertTablet(final InsertTabletNode node, final DataRegion dataRegion) { try { dataRegion.insertTablet(node); - dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; } catch (final OutOfTTLException e) { LOGGER.debug("Error in executing plan node: {}, caused by {}", node, e.getMessage()); @@ -136,7 +134,6 @@ public TSStatus visitInsertTablet(final InsertTabletNode node, final DataRegion public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) { try { dataRegion.insert(node); - dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; } catch (WriteProcessRejectException e) { LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage()); @@ -173,7 +170,6 @@ public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) { public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion dataRegion) { try { dataRegion.insertTablets(node); - dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; } catch (WriteProcessRejectException e) { LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage()); @@ -208,7 +204,6 @@ public TSStatus visitInsertRowsOfOneDevice( InsertRowsOfOneDeviceNode node, DataRegion dataRegion) { try { dataRegion.insert(node); - dataRegion.insertSeparatorToWAL(); return StatusUtils.OK; } catch (WriteProcessRejectException e) { LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index 290ec8a4a71f..6bc91acd0084 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -619,7 +619,7 @@ private int serializeMeasurementsAndValuesSize() { @Override public void serializeToWAL(IWALByteBufferView buffer) { buffer.putShort(getType().getNodeType()); - buffer.putLong(searchIndex); + buffer.putLong(getEncodedSearchIndex()); subSerialize(buffer); } @@ -700,7 +700,7 @@ private void putDataTypesAndValues(IWALByteBufferView buffer) { public static InsertRowNode deserializeFromWAL(DataInputStream stream) throws IOException { long searchIndex = stream.readLong(); InsertRowNode insertNode = subDeserializeFromWAL(stream); - insertNode.setSearchIndex(searchIndex); + insertNode.setSearchIndexFromWAL(searchIndex); return insertNode; } @@ -791,7 +791,7 @@ public void fillDataTypesAndValuesFromWAL(DataInputStream stream) throws IOExcep public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); InsertRowNode insertNode = subDeserializeFromWAL(buffer); - insertNode.setSearchIndex(searchIndex); + insertNode.setSearchIndexFromWAL(searchIndex); return insertNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index 28f914e16065..2be395017815 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -349,7 +349,7 @@ private int subSerializeSize() { @Override public void serializeToWAL(IWALByteBufferView buffer) { buffer.putShort(getType().getNodeType()); - buffer.putLong(searchIndex); + buffer.putLong(getEncodedSearchIndex()); subSerialize(buffer); } @@ -377,7 +377,7 @@ public static InsertRowsNode deserializeFromWAL(DataInputStream stream) throws I InsertRowNode insertRowNode = InsertRowNode.subDeserializeFromWAL(stream); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } - insertRowsNode.setSearchIndex(searchIndex); + insertRowsNode.setSearchIndexFromWAL(searchIndex); return insertRowsNode; } @@ -397,7 +397,7 @@ public static InsertRowsNode deserializeFromWAL(ByteBuffer buffer) { InsertRowNode insertRowNode = InsertRowNode.subDeserializeFromWAL(buffer); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } - insertRowsNode.setSearchIndex(searchIndex); + insertRowsNode.setSearchIndexFromWAL(searchIndex); return insertRowsNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 995e8a95e3fd..25cf3c4fc7e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -880,7 +880,7 @@ public void serializeToWAL(IWALByteBufferView buffer, List rangeList) { } void subSerialize(IWALByteBufferView buffer, List rangeList) { - buffer.putLong(searchIndex); + buffer.putLong(getEncodedSearchIndex()); WALWriteUtils.write(targetPath.getFullPath(), buffer); // data types are serialized in measurement schemas writeMeasurementSchemas(buffer); @@ -1012,7 +1012,7 @@ public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws } protected void subDeserializeFromWAL(DataInputStream stream) throws IOException { - searchIndex = stream.readLong(); + setSearchIndexFromWAL(stream.readLong()); try { targetPath = readTargetPath(stream); } catch (IllegalPathException e) { @@ -1047,7 +1047,7 @@ public static InsertTabletNode deserializeFromWAL(ByteBuffer buffer) { } protected void subDeserializeFromWAL(ByteBuffer buffer) { - searchIndex = buffer.getLong(); + setSearchIndexFromWAL(buffer.getLong()); try { targetPath = readTargetPath(buffer); } catch (IllegalPathException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java index e622dba30b9b..8ef6802f047f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java @@ -128,14 +128,14 @@ public static RelationalInsertRowNode deserializeFromWAL(DataInputStream stream) throws IOException { long searchIndex = stream.readLong(); RelationalInsertRowNode insertNode = subDeserializeFromWAL(stream); - insertNode.setSearchIndex(searchIndex); + insertNode.setSearchIndexFromWAL(searchIndex); return insertNode; } public static RelationalInsertRowNode deserializeFromWAL(ByteBuffer buffer) { long searchIndex = buffer.getLong(); RelationalInsertRowNode insertNode = subDeserializeFromWAL(buffer); - insertNode.setSearchIndex(searchIndex); + insertNode.setSearchIndexFromWAL(searchIndex); return insertNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 83498ceefc9b..741c45f32564 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -124,7 +124,7 @@ public static RelationalInsertRowsNode deserializeFromWAL(DataInputStream stream RelationalInsertRowNode insertRowNode = RelationalInsertRowNode.subDeserializeFromWAL(stream); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } - insertRowsNode.setSearchIndex(searchIndex); + insertRowsNode.setSearchIndexFromWAL(searchIndex); return insertRowsNode; } @@ -144,7 +144,7 @@ public static RelationalInsertRowsNode deserializeFromWAL(ByteBuffer buffer) { RelationalInsertRowNode insertRowNode = RelationalInsertRowNode.subDeserializeFromWAL(buffer); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } - insertRowsNode.setSearchIndex(searchIndex); + insertRowsNode.setSearchIndexFromWAL(searchIndex); return insertRowsNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java index 5a7e5d9cf78e..7f7e5bc94582 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java @@ -28,6 +28,8 @@ public abstract class SearchNode extends WritePlanNode implements ComparableConsensusRequest { + private static final long LAST_FRAGMENT_MASK = Long.MIN_VALUE; + /** this insert node doesn't need to participate in iot consensus */ public static final long NO_CONSENSUS_INDEX = ConsensusReqReader.DEFAULT_SEARCH_INDEX; @@ -37,6 +39,8 @@ public abstract class SearchNode extends WritePlanNode implements ComparableCons */ protected long searchIndex = NO_CONSENSUS_INDEX; + protected boolean isLastFragment = false; + protected SearchNode(PlanNodeId id) { super(id); } @@ -51,5 +55,38 @@ public SearchNode setSearchIndex(long searchIndex) { return this; } + public boolean isLastFragment() { + return isLastFragment; + } + + public SearchNode setLastFragment(boolean lastFragment) { + isLastFragment = lastFragment; + return this; + } + + protected long getEncodedSearchIndex() { + if (searchIndex == NO_CONSENSUS_INDEX || !isLastFragment) { + return searchIndex; + } + return searchIndex | LAST_FRAGMENT_MASK; + } + + public static long extractSearchIndex(long encodedSearchIndex) { + if (encodedSearchIndex == NO_CONSENSUS_INDEX) { + return encodedSearchIndex; + } + return encodedSearchIndex & ~LAST_FRAGMENT_MASK; + } + + public static boolean isLastFragment(long encodedSearchIndex) { + return encodedSearchIndex != NO_CONSENSUS_INDEX + && (encodedSearchIndex & LAST_FRAGMENT_MASK) != 0; + } + + protected void setSearchIndexFromWAL(long encodedSearchIndex) { + this.searchIndex = extractSearchIndex(encodedSearchIndex); + this.isLastFragment = isLastFragment(encodedSearchIndex); + } + public abstract SearchNode merge(List searchNodes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index a6808b8d972d..554149f34f90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1194,6 +1194,7 @@ public void insert(InsertRowNode insertRowNode) throws WriteProcessException { // init map long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); initFlushTimeMap(timePartitionId); + insertRowNode.setLastFragment(true); boolean isSequence = config.isEnableSeparateData() @@ -1322,14 +1323,29 @@ private boolean doInsert( InsertTabletNode insertTabletNode, Map[]> splitMap, TSStatus[] results, - long[] infoForMetrics) + long[] infoForMetrics, + boolean markLastFragmentOnFinalWrite) throws DataTypeInconsistentException { boolean noFailure = true; + int remainingFragmentCount = 0; + if (markLastFragmentOnFinalWrite) { + for (Entry[]> entry : splitMap.entrySet()) { + List[] rangeLists = entry.getValue(); + if (rangeLists[1] != null) { + remainingFragmentCount++; + } + if (rangeLists[0] != null) { + remainingFragmentCount++; + } + } + } for (Entry[]> entry : splitMap.entrySet()) { long timePartitionId = entry.getKey(); List[] rangeLists = entry.getValue(); List sequenceRangeList = rangeLists[1]; if (sequenceRangeList != null) { + insertTabletNode.setLastFragment( + markLastFragmentOnFinalWrite && remainingFragmentCount == 1); noFailure = insertTabletToTsFileProcessor( insertTabletNode, @@ -1340,9 +1356,12 @@ private boolean doInsert( noFailure, infoForMetrics) && noFailure; + remainingFragmentCount--; } List unSequenceRangeList = rangeLists[0]; if (unSequenceRangeList != null) { + insertTabletNode.setLastFragment( + markLastFragmentOnFinalWrite && remainingFragmentCount == 1); noFailure = insertTabletToTsFileProcessor( insertTabletNode, @@ -1353,6 +1372,7 @@ private boolean doInsert( noFailure, infoForMetrics) && noFailure; + remainingFragmentCount--; } } return noFailure; @@ -1391,7 +1411,7 @@ public void insertTablet(InsertTabletNode insertTabletNode) // infoForMetrics[2]: ScheduleWalTimeCost // infoForMetrics[3]: ScheduleMemTableTimeCost // infoForMetrics[4]: InsertedPointsNumber - boolean noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics); + boolean noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics, true); updateTsFileProcessorMetric(insertTabletNode, infoForMetrics); if (!noFailure) { @@ -1407,7 +1427,8 @@ private boolean splitAndInsert( InsertTabletNode insertTabletNode, TSStatus[] results, long[] infoForMetrics, - List> deviceEndOffsetPairs) { + List> deviceEndOffsetPairs, + boolean markLastFragmentOnFinalWrite) { final int initialStart = start; try { Map[]> splitInfo = new HashMap<>(); @@ -1416,7 +1437,8 @@ private boolean splitAndInsert( split(insertTabletNode, start, end, splitInfo); start = end; } - return doInsert(insertTabletNode, splitInfo, results, infoForMetrics); + return doInsert( + insertTabletNode, splitInfo, results, infoForMetrics, markLastFragmentOnFinalWrite); } catch (DataTypeInconsistentException e) { // the exception will trigger a flush, which requires the flush time to be recalculated start = initialStart; @@ -1427,7 +1449,8 @@ private boolean splitAndInsert( start = end; } try { - return doInsert(insertTabletNode, splitInfo, results, infoForMetrics); + return doInsert( + insertTabletNode, splitInfo, results, infoForMetrics, markLastFragmentOnFinalWrite); } catch (DataTypeInconsistentException ex) { logger.error("Data inconsistent exception is not supposed to be triggered twice", ex); return false; @@ -1436,7 +1459,10 @@ private boolean splitAndInsert( } private boolean executeInsertTablet( - InsertTabletNode insertTabletNode, TSStatus[] results, long[] infoForMetrics) + InsertTabletNode insertTabletNode, + TSStatus[] results, + long[] infoForMetrics, + boolean markLastFragmentOnFinalWrite) throws OutOfTTLException { boolean noFailure; int loc = @@ -1447,7 +1473,13 @@ private boolean executeInsertTablet( List> deviceEndOffsetPairs = insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount()); noFailure = - splitAndInsert(loc, insertTabletNode, results, infoForMetrics, deviceEndOffsetPairs) + splitAndInsert( + loc, + insertTabletNode, + results, + infoForMetrics, + deviceEndOffsetPairs, + markLastFragmentOnFinalWrite) && noFailure; if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() @@ -1460,6 +1492,25 @@ private boolean executeInsertTablet( return noFailure; } + private int findLastInsertTabletIndexToMark(final InsertMultiTabletsNode insertMultiTabletsNode) { + for (int i = insertMultiTabletsNode.getInsertTabletNodeList().size() - 1; i >= 0; i--) { + final InsertTabletNode insertTabletNode = + insertMultiTabletsNode.getInsertTabletNodeList().get(i); + if (insertTabletNode.getRowCount() <= 0 || insertTabletNode.allMeasurementFailed()) { + continue; + } + if (!insertTabletNode.shouldCheckTTL()) { + return i; + } + final long[] times = insertTabletNode.getTimes(); + if (times.length > 0 + && CommonUtils.isAlive(times[times.length - 1], getTTL(insertTabletNode))) { + return i; + } + } + return -1; + } + private void initFlushTimeMap(long timePartitionId) { if (config.isEnableSeparateData() && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) { @@ -1741,8 +1792,10 @@ private List insertToTsFileProcessors( } List executedInsertRowNodeList = new ArrayList<>(); + int remainingFragments = tsFileProcessorMap.size(); for (Map.Entry entry : tsFileProcessorMap.entrySet()) { InsertRowsNode subInsertRowsNode = entry.getValue(); + subInsertRowsNode.setLastFragment(--remainingFragments == 0); try { List insertedProcessors = insertRowsWithTypeConsistencyCheck(entry.getKey(), subInsertRowsNode, infoForMetrics); @@ -1854,10 +1907,14 @@ private List retryInsertRowsAfterFlush( } final List insertedProcessors = new ArrayList<>(retriedProcessorMap.size()); + int remainingRetriedFragments = retriedProcessorMap.size(); for (Entry retriedEntry : retriedProcessorMap.entrySet()) { final TsFileProcessor retriedProcessor = retriedEntry.getKey(); - registerToTsFile(retriedEntry.getValue(), retriedProcessor); - retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics); + final InsertRowsNode retriedInsertRowsNode = retriedEntry.getValue(); + retriedInsertRowsNode.setLastFragment( + subInsertRowsNode.isLastFragment() && --remainingRetriedFragments == 0); + registerToTsFile(retriedInsertRowsNode, retriedProcessor); + retriedProcessor.insertRows(retriedInsertRowsNode, infoForMetrics); insertedProcessors.add(retriedProcessor); } return insertedProcessors; @@ -4587,8 +4644,10 @@ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) // infoForMetrics[2]: ScheduleWalTimeCost // infoForMetrics[3]: ScheduleMemTableTimeCost // infoForMetrics[4]: InsertedPointsNumber + int remainingFragments = tsFileProcessorMap.size(); for (Map.Entry entry : tsFileProcessorMap.entrySet()) { InsertRowsNode subInsertRowsNode = entry.getValue(); + subInsertRowsNode.setLastFragment(--remainingFragments == 0); try { List insertedProcessors = insertRowsWithTypeConsistencyCheck(entry.getKey(), subInsertRowsNode, infoForMetrics); @@ -4728,6 +4787,7 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) // infoForMetrics[2]: ScheduleWalTimeCost // infoForMetrics[3]: ScheduleMemTableTimeCost // infoForMetrics[4]: InsertedPointsNumbe + final int lastTabletIndexToMark = findLastInsertTabletIndexToMark(insertMultiTabletsNode); for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); long[] times = insertTabletNode.getTimes(); @@ -4741,7 +4801,9 @@ public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) Arrays.fill(results, RpcUtils.SUCCESS_STATUS); boolean noFailure = false; try { - noFailure = executeInsertTablet(insertTabletNode, results, infoForMetrics); + noFailure = + executeInsertTablet( + insertTabletNode, results, infoForMetrics, i == lastTabletIndexToMark); } catch (WriteProcessException e) { insertMultiTabletsNode .getResults() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 98993c563f9c..3a19909a6d66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -687,7 +688,9 @@ public boolean hasNext() { if (type.needSearch()) { // see WALInfoEntry#serialize, entry type + memtable id + plan node type buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + PlanNodeType.BYTES); - final long currentWalEntryIndex = buffer.getLong(); + final long encodedSearchIndex = buffer.getLong(); + final long currentWalEntryIndex = SearchNode.extractSearchIndex(encodedSearchIndex); + final boolean isLastFragment = SearchNode.isLastFragment(encodedSearchIndex); buffer.clear(); if (currentWalEntryIndex == -1) { // WAL entry of targetIndex has been fully collected, so put them into insertNodes @@ -712,6 +715,9 @@ public boolean hasNext() { tmpNodes.get().add(new IoTConsensusRequest(buffer)); memorySize += buffer.remaining(); } + if (isLastFragment) { + tryToCollectInsertNodeAndBumpIndex.run(); + } } else { // currentWalEntryIndex > targetIndex // WAL entry of targetIndex has been fully collected, put them into insertNodes @@ -740,6 +746,9 @@ public boolean hasNext() { tmpNodes.get().add(new IoTConsensusRequest(buffer)); memorySize += buffer.remaining(); } + if (isLastFragment) { + tryToCollectInsertNodeAndBumpIndex.run(); + } } } else { tryToCollectInsertNodeAndBumpIndex.run(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java index dbe69da24622..21bc9e0d1209 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java @@ -23,10 +23,13 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -35,6 +38,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; public class InsertRowNodeSerdeTest { @@ -72,6 +76,7 @@ public void testSerializeAndDeserialize() throws IllegalPathException { @Test public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException { InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.setLastFragment(true); int serializedSize = insertRowNode.serializedSize(); @@ -96,6 +101,67 @@ public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOE new MeasurementSchema("s5", TSDataType.BOOLEAN) }); Assert.assertEquals(insertRowNode, tmpNode); + Assert.assertTrue(tmpNode.isLastFragment()); + } + + @Test + public void testDeserializeLegacyWAL() throws IllegalPathException, IOException { + InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId()); + tmpNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s2", TSDataType.FLOAT), + new MeasurementSchema("s3", TSDataType.INT64), + new MeasurementSchema("s4", TSDataType.INT32), + new MeasurementSchema("s5", TSDataType.BOOLEAN) + }); + Assert.assertEquals(insertRowNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); + } + + @Test + public void testDeserializeLegacyWALRelational() throws IOException { + RelationalInsertRowNode insertRowNode = getRelationalInsertRowNodeWithMeasurementSchemas(); + insertRowNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROW.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + RelationalInsertRowNode tmpNode = RelationalInsertRowNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId()); + tmpNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("id", TSDataType.STRING), + new MeasurementSchema("attr", TSDataType.TEXT), + new MeasurementSchema("value", TSDataType.INT64) + }); + Assert.assertEquals(insertRowNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); } private InsertRowNode getInsertRowNode() throws IllegalPathException { @@ -199,4 +265,28 @@ private InsertRowNode getInsertRowNodeWithStringValue() throws IllegalPathExcept insertRowNode.setNeedInferType(true); return insertRowNode; } + + private RelationalInsertRowNode getRelationalInsertRowNodeWithMeasurementSchemas() { + return new RelationalInsertRowNode( + new PlanNodeId("plannode 3"), + new PartialPath("table1", false), + false, + new String[] {"id", "attr", "value"}, + new TSDataType[] {TSDataType.STRING, TSDataType.TEXT, TSDataType.INT64}, + new MeasurementSchema[] { + new MeasurementSchema("id", TSDataType.STRING), + new MeasurementSchema("attr", TSDataType.TEXT), + new MeasurementSchema("value", TSDataType.INT64) + }, + 90L, + new Object[] { + new Binary("d1".getBytes(StandardCharsets.UTF_8)), + new Binary("v1".getBytes(StandardCharsets.UTF_8)), + 1L + }, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.FIELD + }); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java index 08819830557e..034d4cb57de4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java @@ -129,6 +129,7 @@ public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOE new Object[] {2.0, false}, false), 1); + insertRowsNode.setLastFragment(true); int serializedSize = insertRowsNode.serializedSize(); @@ -145,6 +146,70 @@ public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOE InsertRowsNode tmpNode = InsertRowsNode.deserializeFromWAL(dataInputStream); tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId()); Assert.assertEquals(insertRowsNode, tmpNode); + Assert.assertTrue(tmpNode.isLastFragment()); + } + + @Test + public void testDeserializeLegacyWAL() throws IllegalPathException, IOException { + InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan node 1")); + insertRowsNode.addOneInsertRowNode( + new InsertRowNode( + new PlanNodeId(""), + new PartialPath("root.sg.d1"), + false, + new String[] {"s1", "s2", "s3", "s4", "s5"}, + new TSDataType[] { + TSDataType.DOUBLE, + TSDataType.FLOAT, + TSDataType.INT64, + TSDataType.TEXT, + TSDataType.STRING + }, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s2", TSDataType.FLOAT), + new MeasurementSchema("s3", TSDataType.INT64), + new MeasurementSchema("s4", TSDataType.TEXT), + new MeasurementSchema("s5", TSDataType.STRING) + }, + 1000L, + new Object[] {1.0, 2f, 300L, new Binary("444".getBytes(StandardCharsets.UTF_8)), null}, + false), + 0); + + insertRowsNode.addOneInsertRowNode( + new InsertRowNode( + new PlanNodeId(""), + new PartialPath("root.sg.d2"), + false, + new String[] {"s1", "s4"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN}, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s4", TSDataType.BOOLEAN), + }, + 2000L, + new Object[] {2.0, false}, + false), + 1); + insertRowsNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertRowsNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowsNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.INSERT_ROWS.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + InsertRowsNode tmpNode = InsertRowsNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId()); + Assert.assertEquals(insertRowsNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); } @Test @@ -262,6 +327,7 @@ public void testSerializeAndDeserializeForWALRelational() throws IOException { TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE }), 1); + insertRowsNode.setLastFragment(true); int serializedSize = insertRowsNode.serializedSize(); @@ -280,6 +346,81 @@ public void testSerializeAndDeserializeForWALRelational() throws IOException { RelationalInsertRowsNode.deserializeFromWAL(dataInputStream); tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId()); Assert.assertEquals(insertRowsNode, tmpNode); + Assert.assertTrue(tmpNode.isLastFragment()); } } + + @Test + public void testDeserializeLegacyWALRelational() throws IOException { + RelationalInsertRowsNode insertRowsNode = + new RelationalInsertRowsNode(new PlanNodeId("plan node 1")); + insertRowsNode.addOneInsertRowNode( + new RelationalInsertRowNode( + new PlanNodeId(""), + new PartialPath("table1", false), + false, + new String[] {"s1", "s2", "s3", "s4", "s5"}, + new TSDataType[] { + TSDataType.DOUBLE, + TSDataType.FLOAT, + TSDataType.INT64, + TSDataType.TEXT, + TSDataType.STRING + }, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s2", TSDataType.FLOAT), + new MeasurementSchema("s3", TSDataType.INT64), + new MeasurementSchema("s4", TSDataType.TEXT), + new MeasurementSchema("s5", TSDataType.STRING) + }, + 1000L, + new Object[] {1.0, 2f, 300L, new Binary("444".getBytes(StandardCharsets.UTF_8)), null}, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }), + 0); + + insertRowsNode.addOneInsertRowNode( + new RelationalInsertRowNode( + new PlanNodeId(""), + new PartialPath("table1", false), + false, + new String[] {"s1", "s4"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN}, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s4", TSDataType.BOOLEAN), + }, + 2000L, + new Object[] {2.0, false}, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE + }), + 1); + insertRowsNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertRowsNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowsNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROWS.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + RelationalInsertRowsNode tmpNode = RelationalInsertRowsNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId()); + Assert.assertEquals(insertRowsNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java index 4d8f5f6e4a80..ddc35e1eda92 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java @@ -68,6 +68,7 @@ public void testSerializeAndDeserialize() throws IllegalPathException { @Test public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOException { InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.setLastFragment(true); int serializedSize = insertTabletNode.serializedSize(); @@ -93,6 +94,38 @@ public void testSerializeAndDeserializeForWAL() throws IllegalPathException, IOE new MeasurementSchema("s5", TSDataType.BOOLEAN) }); Assert.assertEquals(insertTabletNode, tmpNode); + Assert.assertTrue(tmpNode.isLastFragment()); + } + + @Test + public void testDeserializeLegacyWAL() throws IllegalPathException, IOException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId()); + tmpNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s2", TSDataType.FLOAT), + new MeasurementSchema("s3", TSDataType.INT64), + new MeasurementSchema("s4", TSDataType.INT32), + new MeasurementSchema("s5", TSDataType.BOOLEAN) + }); + Assert.assertEquals(insertTabletNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); } @Test @@ -126,6 +159,7 @@ public void testSerializeAndDeserializeForWALRelational() throws IOException { for (String tableName : new String[] {"table1", "ta`ble1", "root.table1"}) { RelationalInsertTabletNode insertTabletNode = getRelationalInsertTabletNodeWithSchema(tableName); + insertTabletNode.setLastFragment(true); int serializedSize = insertTabletNode.serializedSize(); @@ -153,9 +187,42 @@ public void testSerializeAndDeserializeForWALRelational() throws IOException { new MeasurementSchema("s5", TSDataType.BOOLEAN) }); Assert.assertEquals(insertTabletNode, tmpNode); + Assert.assertTrue(tmpNode.isLastFragment()); } } + @Test + public void testDeserializeLegacyWALRelational() throws IOException { + RelationalInsertTabletNode insertTabletNode = getRelationalInsertTabletNodeWithSchema("table1"); + insertTabletNode.setSearchIndex(123L); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_TABLET.getNodeType(), byteBuffer.getShort()); + Assert.assertEquals(123L, byteBuffer.getLong()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + dataInputStream.readShort(); + + RelationalInsertTabletNode tmpNode = + RelationalInsertTabletNode.deserializeFromWAL(dataInputStream); + tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId()); + tmpNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.DOUBLE), + new MeasurementSchema("s2", TSDataType.FLOAT), + new MeasurementSchema("s3", TSDataType.INT64), + new MeasurementSchema("s4", TSDataType.INT32), + new MeasurementSchema("s5", TSDataType.BOOLEAN) + }); + Assert.assertEquals(insertTabletNode, tmpNode); + Assert.assertEquals(123L, tmpNode.getSearchIndex()); + Assert.assertFalse(tmpNode.isLastFragment()); + } + @Test public void testInitTabletValuesWithAllTypes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java index 2977841d5e6b..b24a8cd29cf5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java @@ -178,6 +178,29 @@ public void testWaitForNextReadySucceedsAfterRollFile() throws Exception { assertNotNull(iterator.next()); } + @Test + public void testLegacySeparatorStillWorksAfterRollFile() throws Exception { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); + insertTabletNode.setSearchIndex(1); + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + walNode.log(memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode()); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + walNode.rollWALFile(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + assertTrue(iterator.hasNext()); + assertNotNull(iterator.next()); + } + /** * Verifies that waitForNextReady wakes up when a WAL file roll is triggered concurrently. A * background thread rolls the WAL file while the main thread waits on the iterator. @@ -190,12 +213,11 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception { // write data with search index InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); + insertTabletNode.setLastFragment(true); walNode.log( memTable.getMemTableId(), insertTabletNode, Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - walNode.log( - memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId(""))); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); @@ -318,12 +340,11 @@ public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception { // write data with search index — stays in the current (active) WAL file InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); insertTabletNode.setSearchIndex(1); + insertTabletNode.setLastFragment(true); walNode.log( memTable.getMemTableId(), insertTabletNode, Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); - walNode.log( - memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId(""))); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());