Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -113,6 +113,7 @@
null,
false,
null,
null,
isWithMod);
}

Expand All @@ -130,6 +131,37 @@
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
final boolean isWithMod)
throws IOException, IllegalPathException {
this(
pipeName,
creationTime,
tsFile,
pattern,
startTime,
endTime,
pipeTaskMeta,
sourceEvent,
entity,
skipIfNoPrivileges,
deviceIsAlignedMap,
null,
isWithMod);
}

public TsFileInsertionEventQueryParser(

Check warning on line 150 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 13 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4mt0irOyQc2LVr1y67&open=AZ4mt0irOyQc2LVr1y67&pullRequest=17674
final String pipeName,
final long creationTime,
final File tsFile,
final TreePattern pattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent,
final IAuditEntity entity,
final boolean skipIfNoPrivileges,
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
final boolean isWithMod)
throws IOException, IllegalPathException {
super(
tsFile,
pipeName,
Expand Down Expand Up @@ -162,7 +194,25 @@
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);
tsFileReader = new TsFileReader(tsFileSequenceReader);

if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
if (Objects.nonNull(deviceMeasurementsMapOverride)) {
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
? new LinkedHashMap<>(deviceIsAlignedMap)
: readDeviceIsAlignedMap();
memoryRequiredInBytes +=
Objects.nonNull(deviceIsAlignedMap)
? 0
: PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);

measurementDataTypeMap =
readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet());
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);

deviceMeasurementsMap = new LinkedHashMap<>(deviceMeasurementsMapOverride);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
} else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
Expand Down Expand Up @@ -263,10 +313,34 @@
}
}

public TsFileInsertionEventQueryParser(
final File tsFile,
final TreePattern pattern,
final long startTime,
final long endTime,
final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
final boolean isWithMod)
throws IOException, IllegalPathException {
this(
null,
0,
tsFile,
pattern,
startTime,
endTime,
null,
null,
null,
false,
null,
deviceMeasurementsMapOverride,
isWithMod);
}

private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap)
throws IllegalPathException {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new LinkedHashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();

Expand Down Expand Up @@ -315,7 +389,7 @@
}

private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new LinkedHashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
while (deviceIsAlignedIterator.hasNext()) {
Expand Down Expand Up @@ -345,7 +419,7 @@
*/
private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final Set<IDeviceID> devices)
throws IOException {
final Map<String, TSDataType> result = new HashMap<>();
final Map<String, TSDataType> result = new LinkedHashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
Expand All @@ -367,7 +441,7 @@
*/
private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();
final Map<IDeviceID, List<String>> result = new LinkedHashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
private IDeviceID currentDevice;
private boolean currentIsAligned;
private final List<IMeasurementSchema> currentMeasurements = new ArrayList<>();
private Exception deferredException;
private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
Expand Down Expand Up @@ -199,6 +200,7 @@

@Override
public boolean hasNext() {
throwIfDeferredException();
final boolean hasNext = Objects.nonNull(chunkReader);
if (hasNext && !parseStartTimeRecorded) {
// Record start time on first hasNext() that returns true
Expand Down Expand Up @@ -227,7 +229,7 @@
final Tablet tablet = getNextTablet();
// Record tablet metrics
recordTabletMetrics(tablet);
final boolean hasNext = hasNext();
final boolean isLast = isLastTabletWithoutDeferredException();
try {
return sourceEvent == null
? new PipeRawTabletInsertionEvent(
Expand All @@ -241,7 +243,7 @@
0,
pipeTaskMeta,
sourceEvent,
!hasNext)
isLast)
: new PipeRawTabletInsertionEvent(
sourceEvent.getRawIsTableModelEvent(),
sourceEvent.getSourceDatabaseNameFromDataRegion(),
Expand All @@ -253,9 +255,10 @@
sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
!hasNext);
isLast);
} finally {
if (!hasNext) {
if (isLast) {
recordParseEndTimeIfNecessary();
close();
}
}
Expand All @@ -270,6 +273,7 @@
new Iterator<Pair<Tablet, Boolean>>() {
@Override
public boolean hasNext() {
throwIfDeferredException();
return Objects.nonNull(chunkReader);
}

Expand All @@ -286,18 +290,33 @@
// information.
final boolean isAligned = currentIsAligned;
final Tablet tablet = getNextTablet();
final boolean hasNext = hasNext();
try {
return new Pair<>(tablet, isAligned);
} finally {
if (!hasNext) {
if (isLastTabletWithoutDeferredException()) {
close();
}
}
}
};
}

public IDeviceID getCurrentDevice() {
return currentDevice;
}

public boolean isCurrentAligned() {
return currentIsAligned;
}

public List<String> getCurrentMeasurements() {
final List<String> measurementIds = new ArrayList<>(currentMeasurements.size());
for (final IMeasurementSchema schema : currentMeasurements) {
measurementIds.add(schema.getMeasurementName());
}
return measurementIds;
}

private Tablet getNextTablet() {
try {
Tablet tablet = null;
Expand Down Expand Up @@ -352,7 +371,11 @@

// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
prepareData();
try {

Check warning on line 374 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Extract this nested try block into a separate method.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4mt0jeOyQc2LVr1y68&open=AZ4mt0jeOyQc2LVr1y68&pullRequest=17674
prepareData();
} catch (final Exception e) {
deferredException = e;
}
}
return tablet;
} catch (final Exception e) {
Expand All @@ -361,6 +384,26 @@
}
}

private void throwIfDeferredException() {
if (Objects.isNull(deferredException)) {
return;
}

final Exception exception = deferredException;
deferredException = null;
throw new PipeException("Failed to prepare next tablet insertion event.", exception);
}

private boolean isLastTabletWithoutDeferredException() {
return Objects.isNull(deferredException) && Objects.isNull(chunkReader);
}

private void recordParseEndTimeIfNecessary() {
if (parseStartTimeRecorded && !parseEndTimeRecorded) {
recordParseEndTime();
}
}

private void prepareData() throws IOException, IllegalPathException {
do {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.iotdb.db.storageengine.load.converter;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
Expand Down Expand Up @@ -89,16 +87,9 @@ public Optional<TSStatus> visitLoadFile(

try {
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionEventScanParser parser =
new TsFileInsertionEventScanParser(
file,
new IoTDBTreePattern(null),
Long.MIN_VALUE,
Long.MAX_VALUE,
null,
null,
true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
try (final LoadTreeTsFileTabletIterator tabletIterator =
new LoadTreeTsFileTabletIterator(file, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : tabletIterator) {
final PipeTransferTabletRawReq tabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight());
Expand Down
Loading
Loading