Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public TSStatus visitRelationalInsertRows(RelationalInsertRowsNode node, DataReg
public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node, e.getMessage());
Expand All @@ -99,7 +98,6 @@ public TSStatus visitRelationalInsertTablet(
public TSStatus visitInsertTablet(final InsertTabletNode node, final DataRegion dataRegion) {
try {
dataRegion.insertTablet(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (final OutOfTTLException e) {
LOGGER.debug("Error in executing plan node: {}, caused by {}", node, e.getMessage());
Expand Down Expand Up @@ -136,7 +134,6 @@ public TSStatus visitInsertTablet(final InsertTabletNode node, final DataRegion
public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
Expand Down Expand Up @@ -173,7 +170,6 @@ public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion dataRegion) {
try {
dataRegion.insertTablets(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
Expand Down Expand Up @@ -208,7 +204,6 @@ public TSStatus visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ private int serializeMeasurementsAndValuesSize() {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(getType().getNodeType());
buffer.putLong(searchIndex);
buffer.putLong(getEncodedSearchIndex());
subSerialize(buffer);
}

Expand Down Expand Up @@ -700,7 +700,7 @@ private void putDataTypesAndValues(IWALByteBufferView buffer) {
public static InsertRowNode deserializeFromWAL(DataInputStream stream) throws IOException {
long searchIndex = stream.readLong();
InsertRowNode insertNode = subDeserializeFromWAL(stream);
insertNode.setSearchIndex(searchIndex);
insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}

Expand Down Expand Up @@ -791,7 +791,7 @@ public void fillDataTypesAndValuesFromWAL(DataInputStream stream) throws IOExcep
public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
long searchIndex = buffer.getLong();
InsertRowNode insertNode = subDeserializeFromWAL(buffer);
insertNode.setSearchIndex(searchIndex);
insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private int subSerializeSize() {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
buffer.putShort(getType().getNodeType());
buffer.putLong(searchIndex);
buffer.putLong(getEncodedSearchIndex());
subSerialize(buffer);
}

Expand Down Expand Up @@ -377,7 +377,7 @@ public static InsertRowsNode deserializeFromWAL(DataInputStream stream) throws I
InsertRowNode insertRowNode = InsertRowNode.subDeserializeFromWAL(stream);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
insertRowsNode.setSearchIndex(searchIndex);
insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}

Expand All @@ -397,7 +397,7 @@ public static InsertRowsNode deserializeFromWAL(ByteBuffer buffer) {
InsertRowNode insertRowNode = InsertRowNode.subDeserializeFromWAL(buffer);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
insertRowsNode.setSearchIndex(searchIndex);
insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public void serializeToWAL(IWALByteBufferView buffer, List<int[]> rangeList) {
}

void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
buffer.putLong(searchIndex);
buffer.putLong(getEncodedSearchIndex());
WALWriteUtils.write(targetPath.getFullPath(), buffer);
// data types are serialized in measurement schemas
writeMeasurementSchemas(buffer);
Expand Down Expand Up @@ -1012,7 +1012,7 @@ public static InsertTabletNode deserializeFromWAL(DataInputStream stream) throws
}

protected void subDeserializeFromWAL(DataInputStream stream) throws IOException {
searchIndex = stream.readLong();
setSearchIndexFromWAL(stream.readLong());
try {
targetPath = readTargetPath(stream);
} catch (IllegalPathException e) {
Expand Down Expand Up @@ -1047,7 +1047,7 @@ public static InsertTabletNode deserializeFromWAL(ByteBuffer buffer) {
}

protected void subDeserializeFromWAL(ByteBuffer buffer) {
searchIndex = buffer.getLong();
setSearchIndexFromWAL(buffer.getLong());
try {
targetPath = readTargetPath(buffer);
} catch (IllegalPathException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ public static RelationalInsertRowNode deserializeFromWAL(DataInputStream stream)
throws IOException {
long searchIndex = stream.readLong();
RelationalInsertRowNode insertNode = subDeserializeFromWAL(stream);
insertNode.setSearchIndex(searchIndex);
insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}

public static RelationalInsertRowNode deserializeFromWAL(ByteBuffer buffer) {
long searchIndex = buffer.getLong();
RelationalInsertRowNode insertNode = subDeserializeFromWAL(buffer);
insertNode.setSearchIndex(searchIndex);
insertNode.setSearchIndexFromWAL(searchIndex);
return insertNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static RelationalInsertRowsNode deserializeFromWAL(DataInputStream stream
RelationalInsertRowNode insertRowNode = RelationalInsertRowNode.subDeserializeFromWAL(stream);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
insertRowsNode.setSearchIndex(searchIndex);
insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}

Expand All @@ -144,7 +144,7 @@ public static RelationalInsertRowsNode deserializeFromWAL(ByteBuffer buffer) {
RelationalInsertRowNode insertRowNode = RelationalInsertRowNode.subDeserializeFromWAL(buffer);
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
insertRowsNode.setSearchIndex(searchIndex);
insertRowsNode.setSearchIndexFromWAL(searchIndex);
return insertRowsNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public abstract class SearchNode extends WritePlanNode implements ComparableConsensusRequest {

private static final long LAST_FRAGMENT_MASK = Long.MIN_VALUE;

/** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX = ConsensusReqReader.DEFAULT_SEARCH_INDEX;

Expand All @@ -37,6 +39,8 @@
*/
protected long searchIndex = NO_CONSENSUS_INDEX;

protected boolean isLastFragment = false;

protected SearchNode(PlanNodeId id) {
super(id);
}
Expand All @@ -51,5 +55,38 @@
return this;
}

public boolean isLastFragment() {
return isLastFragment;
}

public SearchNode setLastFragment(boolean lastFragment) {
isLastFragment = lastFragment;
return this;
}

protected long getEncodedSearchIndex() {
if (searchIndex == NO_CONSENSUS_INDEX || !isLastFragment) {
return searchIndex;
}
return searchIndex | LAST_FRAGMENT_MASK;
}

public static long extractSearchIndex(long encodedSearchIndex) {
if (encodedSearchIndex == NO_CONSENSUS_INDEX) {
return encodedSearchIndex;
}
return encodedSearchIndex & ~LAST_FRAGMENT_MASK;
}

public static boolean isLastFragment(long encodedSearchIndex) {

Check warning on line 81 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '58'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nRstwIpH-aSK_5fHQ&open=AZ4nRstwIpH-aSK_5fHQ&pullRequest=17670
return encodedSearchIndex != NO_CONSENSUS_INDEX
&& (encodedSearchIndex & LAST_FRAGMENT_MASK) != 0;
}

protected void setSearchIndexFromWAL(long encodedSearchIndex) {

Check warning on line 86 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'setSearchIndexFromWAL' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nRstwIpH-aSK_5fHR&open=AZ4nRstwIpH-aSK_5fHR&pullRequest=17670
this.searchIndex = extractSearchIndex(encodedSearchIndex);
this.isLastFragment = isLastFragment(encodedSearchIndex);
}

public abstract SearchNode merge(List<SearchNode> searchNodes);
}
Loading
Loading