From ea1f8b29d02e3a57b39dd1011c5b8985bf97146b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 21:20:28 +0800 Subject: [PATCH 1/2] down --- .../TsFileInsertionEventQueryParser.java | 86 ++- .../scan/TsFileInsertionEventScanParser.java | 57 +- ...tementDataTypeConvertExecutionVisitor.java | 15 +- .../LoadTreeTsFileTabletIterator.java | 558 ++++++++++++++++++ ...ntDataTypeConvertExecutionVisitorTest.java | 299 ++++++++++ 5 files changed, 990 insertions(+), 25 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 7e7226a61656f..62408eb21bc71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -58,9 +58,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -113,6 +113,7 @@ public TsFileInsertionEventQueryParser( null, false, null, + null, isWithMod); } @@ -130,6 +131,37 @@ public TsFileInsertionEventQueryParser( final Map deviceIsAlignedMap, final boolean isWithMod) throws IOException, IllegalPathException { + this( + pipeName, + creationTime, + tsFile, + pattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + entity, + skipIfNoPrivileges, + deviceIsAlignedMap, + null, + isWithMod); + } + + public TsFileInsertionEventQueryParser( + final String pipeName, + final long creationTime, + final File tsFile, + final TreePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final PipeInsertionEvent sourceEvent, + final IAuditEntity entity, + final boolean skipIfNoPrivileges, + final Map deviceIsAlignedMap, + final Map> deviceMeasurementsMapOverride, + final boolean isWithMod) + throws IOException, IllegalPathException { super( tsFile, pipeName, @@ -162,7 +194,25 @@ public TsFileInsertionEventQueryParser( tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true); tsFileReader = new TsFileReader(tsFileSequenceReader); - if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) { + if (Objects.nonNull(deviceMeasurementsMapOverride)) { + this.deviceIsAlignedMap = + Objects.nonNull(deviceIsAlignedMap) + ? new LinkedHashMap<>(deviceIsAlignedMap) + : readDeviceIsAlignedMap(); + memoryRequiredInBytes += + Objects.nonNull(deviceIsAlignedMap) + ? 0 + : PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap); + + measurementDataTypeMap = + readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet()); + memoryRequiredInBytes += + PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap); + + deviceMeasurementsMap = new LinkedHashMap<>(deviceMeasurementsMapOverride); + memoryRequiredInBytes += + PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap); + } else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) { // These read-only objects can be found in cache. this.deviceIsAlignedMap = Objects.nonNull(deviceIsAlignedMap) @@ -263,10 +313,34 @@ public TsFileInsertionEventQueryParser( } } + public TsFileInsertionEventQueryParser( + final File tsFile, + final TreePattern pattern, + final long startTime, + final long endTime, + final Map> deviceMeasurementsMapOverride, + final boolean isWithMod) + throws IOException, IllegalPathException { + this( + null, + 0, + tsFile, + pattern, + startTime, + endTime, + null, + null, + null, + false, + null, + deviceMeasurementsMapOverride, + isWithMod); + } + private Map> filterDeviceMeasurementsMapByPattern( final Map> originalDeviceMeasurementsMap) throws IllegalPathException { - final Map> filteredDeviceMeasurementsMap = new HashMap<>(); + final Map> filteredDeviceMeasurementsMap = new LinkedHashMap<>(); for (Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { final IDeviceID deviceId = entry.getKey(); @@ -315,7 +389,7 @@ else if (treePattern.mayOverlapWithDevice(deviceId)) { } private Map readDeviceIsAlignedMap() throws IOException { - final Map deviceIsAlignedResultMap = new HashMap<>(); + final Map deviceIsAlignedResultMap = new LinkedHashMap<>(); final TsFileDeviceIterator deviceIsAlignedIterator = tsFileSequenceReader.getAllDevicesIteratorWithIsAligned(); while (deviceIsAlignedIterator.hasNext()) { @@ -345,7 +419,7 @@ private Set filterDevicesByPattern(final Set devices) { */ private Map readFilteredFullPathDataTypeMap(final Set devices) throws IOException { - final Map result = new HashMap<>(); + final Map result = new LinkedHashMap<>(); for (final IDeviceID device : devices) { tsFileSequenceReader @@ -367,7 +441,7 @@ private Map readFilteredFullPathDataTypeMap(final Set> readFilteredDeviceMeasurementsMap( final Set devices) throws IOException { - final Map> result = new HashMap<>(); + final Map> result = new LinkedHashMap<>(); for (final IDeviceID device : devices) { tsFileSequenceReader diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index faf96e538e70d..74ecf65d9da9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -93,6 +93,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private IDeviceID currentDevice; private boolean currentIsAligned; private final List currentMeasurements = new ArrayList<>(); + private Exception deferredException; private final List modsInfos = new ArrayList<>(); // Cached time chunk private final List timeChunkList = new ArrayList<>(); @@ -199,6 +200,7 @@ public Iterable toTabletInsertionEvents() { @Override public boolean hasNext() { + throwIfDeferredException(); final boolean hasNext = Objects.nonNull(chunkReader); if (hasNext && !parseStartTimeRecorded) { // Record start time on first hasNext() that returns true @@ -227,7 +229,7 @@ public TabletInsertionEvent next() { final Tablet tablet = getNextTablet(); // Record tablet metrics recordTabletMetrics(tablet); - final boolean hasNext = hasNext(); + final boolean isLast = isLastTabletWithoutDeferredException(); try { return sourceEvent == null ? new PipeRawTabletInsertionEvent( @@ -241,7 +243,7 @@ public TabletInsertionEvent next() { 0, pipeTaskMeta, sourceEvent, - !hasNext) + isLast) : new PipeRawTabletInsertionEvent( sourceEvent.getRawIsTableModelEvent(), sourceEvent.getSourceDatabaseNameFromDataRegion(), @@ -253,9 +255,10 @@ public TabletInsertionEvent next() { sourceEvent.getCreationTime(), pipeTaskMeta, sourceEvent, - !hasNext); + isLast); } finally { - if (!hasNext) { + if (isLast) { + recordParseEndTimeIfNecessary(); close(); } } @@ -270,6 +273,7 @@ public Iterable> toTabletWithIsAligneds() { new Iterator>() { @Override public boolean hasNext() { + throwIfDeferredException(); return Objects.nonNull(chunkReader); } @@ -286,11 +290,10 @@ public Pair next() { // information. final boolean isAligned = currentIsAligned; final Tablet tablet = getNextTablet(); - final boolean hasNext = hasNext(); try { return new Pair<>(tablet, isAligned); } finally { - if (!hasNext) { + if (isLastTabletWithoutDeferredException()) { close(); } } @@ -298,6 +301,22 @@ public Pair next() { }; } + public IDeviceID getCurrentDevice() { + return currentDevice; + } + + public boolean isCurrentAligned() { + return currentIsAligned; + } + + public List getCurrentMeasurements() { + final List measurementIds = new ArrayList<>(currentMeasurements.size()); + for (final IMeasurementSchema schema : currentMeasurements) { + measurementIds.add(schema.getMeasurementName()); + } + return measurementIds; + } + private Tablet getNextTablet() { try { Tablet tablet = null; @@ -352,7 +371,11 @@ private Tablet getNextTablet() { // Switch chunk reader iff current chunk is all consumed if (!data.hasCurrent()) { - prepareData(); + try { + prepareData(); + } catch (final Exception e) { + deferredException = e; + } } return tablet; } catch (final Exception e) { @@ -361,6 +384,26 @@ private Tablet getNextTablet() { } } + private void throwIfDeferredException() { + if (Objects.isNull(deferredException)) { + return; + } + + final Exception exception = deferredException; + deferredException = null; + throw new PipeException("Failed to prepare next tablet insertion event.", exception); + } + + private boolean isLastTabletWithoutDeferredException() { + return Objects.isNull(deferredException) && Objects.isNull(chunkReader); + } + + private void recordParseEndTimeIfNecessary() { + if (parseStartTimeRecorded && !parseEndTimeRecorded) { + recordParseEndTime(); + } + } + private void prepareData() throws IOException, IllegalPathException { do { do { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 226966454aaad..f2953716eb95f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -20,9 +20,7 @@ package org.apache.iotdb.db.storageengine.load.converter; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; @@ -89,16 +87,9 @@ public Optional visitLoadFile( try { for (final File file : loadTsFileStatement.getTsFiles()) { - try (final TsFileInsertionEventScanParser parser = - new TsFileInsertionEventScanParser( - file, - new IoTDBTreePattern(null), - Long.MIN_VALUE, - Long.MAX_VALUE, - null, - null, - true)) { - for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + try (final LoadTreeTsFileTabletIterator tabletIterator = + new LoadTreeTsFileTabletIterator(file, true)) { + for (final Pair tabletWithIsAligned : tabletIterator) { final PipeTransferTabletRawReq tabletRawReq = PipeTransferTabletRawReq.toTPipeTransferRawReq( tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java new file mode 100644 index 0000000000000..aa5abfbf2086c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Load uses scan parsing first for throughput. If scan parsing hits corruption, fall back to query + * parsing for the remaining measurements and devices so later data can still be loaded. + */ +class LoadTreeTsFileTabletIterator + implements Iterable>, Iterator>, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTreeTsFileTabletIterator.class); + + private static final TreePattern LOAD_TREE_PATTERN = new IoTDBTreePattern(null); + + private final File file; + private final boolean isWithMod; + private final ArrayDeque pendingQueryTasks = new ArrayDeque<>(); + + private TsFileInsertionEventScanParser scanParser; + private QueryTask activeQueryTask; + private TsFileInsertionEventQueryParser activeQueryParser; + private Iterator> activeIterator; + private boolean scanInitialized; + private boolean fallbackTriggered; + + private IDeviceID lastEmittedDevice; + private List lastEmittedMeasurements = Collections.emptyList(); + private long lastEmittedTimestamp = Long.MIN_VALUE; + + private IDeviceID lastScanTabletDevice; + private List lastScanTabletMeasurements = Collections.emptyList(); + private final Map> fullyEmittedMeasurementsByDevice = + new LinkedHashMap<>(); + + LoadTreeTsFileTabletIterator(final File file, final boolean isWithMod) { + this.file = file; + this.isWithMod = isWithMod; + } + + @Override + public Iterator> iterator() { + return this; + } + + @Override + public boolean hasNext() { + while (true) { + try { + ensureActiveIterator(); + if (Objects.isNull(activeIterator)) { + close(); + return false; + } + + if (activeIterator.hasNext()) { + return true; + } + + if (!switchToNextIterator()) { + close(); + return false; + } + } catch (final Exception e) { + if (recoverFromIteratorFailure(e)) { + continue; + } + close(); + throw toRuntimeException(e); + } + } + } + + @Override + public Pair next() { + while (true) { + if (!hasNext()) { + close(); + throw new NoSuchElementException(); + } + + try { + final Pair next = activeIterator.next(); + recordProgress(next); + return next; + } catch (final Exception e) { + if (recoverFromIteratorFailure(e)) { + continue; + } + close(); + throw toRuntimeException(e); + } + } + } + + private void ensureActiveIterator() throws Exception { + if (Objects.nonNull(activeIterator)) { + return; + } + + if (!scanInitialized && !fallbackTriggered) { + scanInitialized = true; + try { + scanParser = + new TsFileInsertionEventScanParser( + file, LOAD_TREE_PATTERN, Long.MIN_VALUE, Long.MAX_VALUE, null, null, isWithMod); + activeIterator = scanParser.toTabletWithIsAligneds().iterator(); + return; + } catch (final Exception e) { + if (!switchFromScanToQuery(e)) { + throw toRuntimeException(e); + } + } + } + + activateNextQueryParser(); + } + + private boolean switchToNextIterator() { + if (Objects.nonNull(activeQueryParser)) { + closeActiveQueryParser(); + return activateNextQueryParser(); + } + + closeScanParser(); + return activateNextQueryParser(); + } + + private boolean recoverFromIteratorFailure(final Exception e) { + if (shouldRethrow(e)) { + return false; + } + + if (Objects.nonNull(activeQueryTask)) { + LOGGER.warn( + "Load: Query fallback failed for device {} measurements {} in TsFile {}. " + + "Split or skip this query task and continue.", + activeQueryTask.device, + activeQueryTask.measurements, + file.getAbsolutePath(), + e); + splitOrSkipActiveQueryTask(); + return true; + } + + return switchFromScanToQuery(e); + } + + private boolean switchFromScanToQuery(final Exception e) { + if (fallbackTriggered) { + return false; + } + + fallbackTriggered = true; + final IDeviceID currentDevice = + Objects.nonNull(scanParser) ? scanParser.getCurrentDevice() : null; + final List currentMeasurements = + Objects.nonNull(scanParser) ? scanParser.getCurrentMeasurements() : Collections.emptyList(); + + markLastScanMeasurementsAsCompletedIfNeeded(currentDevice, currentMeasurements); + + closeScanParser(); + + try { + pendingQueryTasks.addAll(buildQueryTasks(currentDevice, currentMeasurements)); + } catch (final Exception queryInitException) { + LOGGER.warn( + "Load: Failed to initialize query fallback for TsFile {} after scan parser failure.", + file.getAbsolutePath(), + queryInitException); + return false; + } + + LOGGER.warn( + "Load: Scan parser detected a corrupted section in TsFile {} at device {}. " + + "Switch to query parsing for remaining devices.", + file.getAbsolutePath(), + currentDevice, + e); + return true; + } + + private ArrayDeque buildQueryTasks( + final IDeviceID currentDevice, final List currentMeasurements) throws IOException { + final LinkedHashMap> deviceMeasurementsMap = + readDeviceMeasurementsInOrder(); + if (deviceMeasurementsMap.isEmpty()) { + return new ArrayDeque<>(); + } + + final ArrayDeque tasks = new ArrayDeque<>(); + boolean includeCurrentAndFollowingDevices = + Objects.isNull(currentDevice) || !deviceMeasurementsMap.containsKey(currentDevice); + + for (final Map.Entry> entry : deviceMeasurementsMap.entrySet()) { + final IDeviceID device = entry.getKey(); + if (!includeCurrentAndFollowingDevices && device.equals(currentDevice)) { + includeCurrentAndFollowingDevices = true; + } + if (!includeCurrentAndFollowingDevices) { + continue; + } + + if (device.equals(currentDevice)) { + addCurrentDeviceQueryTasks(tasks, device, entry.getValue(), currentMeasurements); + } else { + addQueryTaskIfNecessary(tasks, device, entry.getValue(), Long.MIN_VALUE, Long.MAX_VALUE); + } + } + + return tasks; + } + + private LinkedHashMap> readDeviceMeasurementsInOrder() + throws IOException { + final LinkedHashMap> deviceMeasurementsMap = new LinkedHashMap<>(); + try (final TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) { + final TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + final IDeviceID device = deviceIterator.next().getLeft(); + deviceMeasurementsMap.put( + device, + reader.readDeviceMetadata(device).values().stream() + .map(TimeseriesMetadata::getMeasurementId) + .collect(Collectors.toList())); + } + } + return deviceMeasurementsMap; + } + + private void addCurrentDeviceQueryTasks( + final ArrayDeque tasks, + final IDeviceID device, + final List allMeasurements, + final List currentMeasurements) { + final Set completedMeasurements = + fullyEmittedMeasurementsByDevice.getOrDefault(device, Collections.emptySet()); + final Set currentMeasurementSet = new LinkedHashSet<>(currentMeasurements); + + final List currentMeasurementsToResume = new ArrayList<>(); + final List remainingMeasurements = new ArrayList<>(); + for (final String measurement : allMeasurements) { + if (completedMeasurements.contains(measurement)) { + continue; + } + if (currentMeasurementSet.contains(measurement)) { + currentMeasurementsToResume.add(measurement); + } else { + remainingMeasurements.add(measurement); + } + } + + addQueryTaskIfNecessary( + tasks, + device, + currentMeasurementsToResume, + determineTaskResumeStartTime(device, currentMeasurementsToResume, Long.MIN_VALUE), + Long.MAX_VALUE); + addQueryTaskIfNecessary(tasks, device, remainingMeasurements, Long.MIN_VALUE, Long.MAX_VALUE); + } + + private boolean activateNextQueryParser() { + closeActiveQueryParser(); + + while (!pendingQueryTasks.isEmpty()) { + activeQueryTask = pendingQueryTasks.removeFirst(); + try { + activeQueryParser = + new TsFileInsertionEventQueryParser( + file, + LOAD_TREE_PATTERN, + activeQueryTask.startTime, + activeQueryTask.endTime, + activeQueryTask.toDeviceMeasurementsMap(), + isWithMod); + final Iterator tabletIterator = + activeQueryParser.toTabletInsertionEvents().iterator(); + activeIterator = + new Iterator>() { + @Override + public boolean hasNext() { + return tabletIterator.hasNext(); + } + + @Override + public Pair next() { + final TabletInsertionEvent event = tabletIterator.next(); + if (!(event instanceof PipeRawTabletInsertionEvent)) { + throw new IllegalStateException( + "Expected PipeRawTabletInsertionEvent but got " + event.getClass().getName()); + } + + final PipeRawTabletInsertionEvent rawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) event; + return new Pair<>( + rawTabletInsertionEvent.convertToTablet(), rawTabletInsertionEvent.isAligned()); + } + }; + return true; + } catch (final Exception e) { + LOGGER.warn( + "Load: Failed to initialize query fallback for device {} measurements {} in TsFile {}. " + + "Split or skip this query task and continue.", + activeQueryTask.device, + activeQueryTask.measurements, + file.getAbsolutePath(), + e); + splitOrSkipActiveQueryTask(); + } + } + + activeIterator = null; + return false; + } + + private void recordProgress(final Pair tabletWithIsAligned) { + final Tablet tablet = tabletWithIsAligned.getLeft(); + if (Objects.isNull(tablet) || tablet.getRowSize() == 0) { + return; + } + + final IDeviceID device = IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()); + final List measurements = extractMeasurementNames(tablet); + + if (Objects.isNull(activeQueryParser)) { + recordScanProgress(device, measurements); + } + + lastEmittedDevice = device; + lastEmittedMeasurements = measurements; + lastEmittedTimestamp = tablet.getTimestamp(tablet.getRowSize() - 1); + } + + private boolean shouldRethrow(final Exception e) { + Throwable current = e; + while (Objects.nonNull(current)) { + if (current instanceof InterruptedException + || current instanceof PipeRuntimeOutOfMemoryCriticalException) { + return true; + } + current = current.getCause(); + } + return false; + } + + private RuntimeException toRuntimeException(final Exception e) { + return e instanceof RuntimeException + ? (RuntimeException) e + : new IllegalStateException("Failed to iterate tablets while loading TsFile.", e); + } + + private void closeScanParser() { + activeIterator = null; + if (Objects.nonNull(scanParser)) { + scanParser.close(); + scanParser = null; + } + } + + private void closeActiveQueryParser() { + activeIterator = null; + activeQueryTask = null; + if (Objects.isNull(activeQueryParser)) { + return; + } + + activeQueryParser.close(); + activeQueryParser = null; + } + + @Override + public void close() { + activeIterator = null; + closeScanParser(); + closeActiveQueryParser(); + pendingQueryTasks.clear(); + } + + private void recordScanProgress(final IDeviceID device, final List measurements) { + if (Objects.nonNull(lastScanTabletDevice) + && (!lastScanTabletDevice.equals(device) + || !measurementsEqual(lastScanTabletMeasurements, measurements))) { + markMeasurementsFullyEmitted(lastScanTabletDevice, lastScanTabletMeasurements); + } + + lastScanTabletDevice = device; + lastScanTabletMeasurements = measurements; + } + + private void markLastScanMeasurementsAsCompletedIfNeeded( + final IDeviceID currentDevice, final List currentMeasurements) { + if (Objects.isNull(lastScanTabletDevice) || lastScanTabletMeasurements.isEmpty()) { + return; + } + + if (!lastScanTabletDevice.equals(currentDevice) + || !currentMeasurements.isEmpty() + && !measurementsEqual(lastScanTabletMeasurements, currentMeasurements)) { + markMeasurementsFullyEmitted(lastScanTabletDevice, lastScanTabletMeasurements); + } + } + + private void markMeasurementsFullyEmitted( + final IDeviceID device, final List measurements) { + if (Objects.isNull(device) || measurements.isEmpty()) { + return; + } + + fullyEmittedMeasurementsByDevice + .computeIfAbsent(device, key -> new LinkedHashSet<>()) + .addAll(measurements); + } + + private long determineTaskResumeStartTime( + final IDeviceID device, final List measurements, final long defaultStartTime) { + if (measurements.isEmpty() + || !device.equals(lastEmittedDevice) + || lastEmittedTimestamp == Long.MIN_VALUE + || !measurementsEqual(measurements, lastEmittedMeasurements)) { + return defaultStartTime; + } + + return lastEmittedTimestamp == Long.MAX_VALUE ? Long.MAX_VALUE : lastEmittedTimestamp + 1; + } + + private void addQueryTaskIfNecessary( + final ArrayDeque tasks, + final IDeviceID device, + final List measurements, + final long startTime, + final long endTime) { + if (measurements.isEmpty() || startTime == Long.MAX_VALUE) { + return; + } + + tasks.addLast(new QueryTask(device, measurements, startTime, endTime)); + } + + private void splitOrSkipActiveQueryTask() { + final QueryTask failedTask = activeQueryTask; + closeActiveQueryParser(); + if (Objects.isNull(failedTask)) { + return; + } + + if (failedTask.measurements.size() <= 1) { + return; + } + + final long resumeStartTime = + determineTaskResumeStartTime( + failedTask.device, failedTask.measurements, failedTask.startTime); + final List splitTasks = failedTask.split(resumeStartTime); + for (int i = splitTasks.size() - 1; i >= 0; --i) { + pendingQueryTasks.addFirst(splitTasks.get(i)); + } + } + + private List extractMeasurementNames(final Tablet tablet) { + final List measurements = new ArrayList<>(tablet.getSchemas().size()); + tablet.getSchemas().forEach(schema -> measurements.add(schema.getMeasurementName())); + return measurements; + } + + private boolean measurementsEqual( + final List leftMeasurements, final List rightMeasurements) { + return leftMeasurements.size() == rightMeasurements.size() + && new LinkedHashSet<>(leftMeasurements).equals(new LinkedHashSet<>(rightMeasurements)); + } + + private static class QueryTask { + private final IDeviceID device; + private final List measurements; + private final long startTime; + private final long endTime; + + private QueryTask( + final IDeviceID device, + final List measurements, + final long startTime, + final long endTime) { + this.device = device; + this.measurements = new ArrayList<>(measurements); + this.startTime = startTime; + this.endTime = endTime; + } + + private LinkedHashMap> toDeviceMeasurementsMap() { + final LinkedHashMap> deviceMeasurementsMap = new LinkedHashMap<>(); + deviceMeasurementsMap.put(device, measurements); + return deviceMeasurementsMap; + } + + private List split(final long resumeStartTime) { + final int middle = measurements.size() / 2; + if (middle <= 0) { + return Collections.emptyList(); + } + + final List splitTasks = new ArrayList<>(2); + splitTasks.add( + new QueryTask(device, measurements.subList(0, middle), resumeStartTime, endTime)); + splitTasks.add( + new QueryTask( + device, measurements.subList(middle, measurements.size()), resumeStartTime, endTime)); + return splitTasks; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java new file mode 100644 index 0000000000000..5cd104dfc3a08 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class LoadTreeStatementDataTypeConvertExecutionVisitorTest { + + private static final String DEVICE_0 = "root.sg.d0"; + private static final String DEVICE_1 = "root.sg.d1"; + private static final String DEVICE_2 = "root.sg.d2"; + private static final String ALIGNED_DEVICE = "root.sg.ad0"; + private static final int ROW_COUNT_PER_DEVICE = 2048; + private File tsFile; + private boolean isPipeMemoryManagementEnabled; + private long pipeMaxReaderChunkSize; + + @Before + public void setUp() { + isPipeMemoryManagementEnabled = PipeConfigAccessor.getPipeMemoryManagementEnabled(); + PipeConfigAccessor.setPipeMemoryManagementEnabled(false); + pipeMaxReaderChunkSize = CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + } + + @After + public void tearDown() { + PipeConfigAccessor.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize); + if (tsFile != null && tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + } + + @Test + public void testFallbackToQueryForRemainingDevicesWhenScanParserHitsCorruption() + throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_1, "s0"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByDevice = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPoints(statement, pointCountByDevice); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + final int loadedPointCountBeforeCorruption = pointCountByDevice.getOrDefault(DEVICE_0, 0); + final int loadedPointCountAfterFallback = pointCountByDevice.getOrDefault(DEVICE_2, 0); + Assert.assertTrue(loadedPointCountBeforeCorruption > 0); + Assert.assertEquals(loadedPointCountBeforeCorruption, loadedPointCountAfterFallback); + } + + @Test + public void testFallbackToQueryForRemainingMeasurementsOfCurrentAlignedDevice() throws Exception { + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + + tsFile = new File("load-tree-query-fallback-corrupted-aligned-current-device.tsfile"); + writeWideAlignedTsFile(tsFile, ALIGNED_DEVICE, 16); + corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s8"); + corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s12"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + for (int measurementIndex = 0; measurementIndex < 8; ++measurementIndex) { + assertMeasurementLoadedCompletely(pointCountByTimeseries, ALIGNED_DEVICE, measurementIndex); + } + for (int measurementIndex : Arrays.asList(9, 10, 11, 13, 14, 15)) { + assertMeasurementLoadedCompletely(pointCountByTimeseries, ALIGNED_DEVICE, measurementIndex); + } + + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s8", 0) < ROW_COUNT_PER_DEVICE); + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < ROW_COUNT_PER_DEVICE); + } + + private void writeTsFile(final File file) throws Exception { + if (file.exists()) { + Assert.assertTrue(file.delete()); + } + + final List schemaList = + Arrays.asList( + new MeasurementSchema("s0", TSDataType.INT64, TSEncoding.PLAIN), + new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN)); + + try (final TsFileWriter writer = new TsFileWriter(file)) { + writeDevice(writer, schemaList, DEVICE_0, 0); + writeDevice(writer, schemaList, DEVICE_1, 10_000); + writeDevice(writer, schemaList, DEVICE_2, 20_000); + } + } + + private void writeWideAlignedTsFile( + final File file, final String device, final int measurementCount) throws Exception { + if (file.exists()) { + Assert.assertTrue(file.delete()); + } + + final List schemaList = new java.util.ArrayList<>(); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + schemaList.add( + new MeasurementSchema("s" + measurementIndex, TSDataType.INT64, TSEncoding.PLAIN)); + } + + try (final TsFileWriter writer = new TsFileWriter(file)) { + writer.registerAlignedTimeseries(new PartialPath(device), schemaList); + + final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE); + for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) { + tablet.addTimestamp(row, row); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + tablet.addValue("s" + measurementIndex, row, (long) measurementIndex * 10_000 + row); + } + } + writer.writeAligned(tablet); + } + } + + private void writeDevice( + final TsFileWriter writer, + final List schemaList, + final String device, + final long valueBase) + throws Exception { + writer.registerTimeseries(new Path(device), schemaList); + + final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE); + for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) { + tablet.addTimestamp(row, row); + tablet.addValue("s0", row, valueBase + row); + tablet.addValue("s1", row, valueBase + ROW_COUNT_PER_DEVICE + row); + } + writer.writeTree(tablet); + } + + private void corruptMeasurementChunk( + final File file, final String device, final String measurement) throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) { + final IDeviceID deviceId = IDeviceID.Factory.DEFAULT_FACTORY.create(device); + final List chunkMetadataList = + reader.getIChunkMetadataList(deviceId, measurement); + Assert.assertFalse(chunkMetadataList.isEmpty()); + + final long chunkHeaderOffset = chunkMetadataList.get(0).getOffsetOfChunkHeader(); + try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) { + randomAccessFile.seek(chunkHeaderOffset + 64); + randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}); + } + } + } + + private void assertMeasurementLoadedCompletely( + final Map pointCountByTimeseries, + final String device, + final int measurementIndex) { + Assert.assertEquals( + ROW_COUNT_PER_DEVICE, + pointCountByTimeseries.getOrDefault(device + ".s" + measurementIndex, 0).intValue()); + } + + private boolean scanParserFails(final File file) throws Exception { + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { + parser.toTabletWithIsAligneds().forEach(tabletWithIsAligned -> {}); + return false; + } catch (final Exception e) { + return true; + } + } + + private void collectLoadedPointsByTimeseries( + final Statement statement, final Map pointCountByTimeseries) { + Assert.assertTrue(statement instanceof InsertMultiTabletsStatement); + for (final InsertTabletStatement insertTabletStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) { + for (int column = 0; column < insertTabletStatement.getMeasurements().length; ++column) { + final String measurement = insertTabletStatement.getMeasurements()[column]; + if (measurement == null || insertTabletStatement.isNull(row, column)) { + continue; + } + pointCountByTimeseries.merge( + insertTabletStatement.getDevicePath().getFullPath() + "." + measurement, + 1, + Integer::sum); + } + } + } + } + + private void collectLoadedPoints( + final Statement statement, final Map pointCountByDevice) { + Assert.assertTrue(statement instanceof InsertMultiTabletsStatement); + for (final InsertTabletStatement insertTabletStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + pointCountByDevice.merge( + insertTabletStatement.getDevicePath().getFullPath(), + countNonNullPoints(insertTabletStatement), + Integer::sum); + } + } + + private int countNonNullPoints(final InsertTabletStatement insertTabletStatement) { + int pointCount = 0; + for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) { + for (int column = 0; column < insertTabletStatement.getMeasurements().length; ++column) { + if (insertTabletStatement.getMeasurements()[column] != null + && !insertTabletStatement.isNull(row, column)) { + ++pointCount; + } + } + } + return pointCount; + } + + private static class PipeConfigAccessor { + private static boolean getPipeMemoryManagementEnabled() { + return CommonDescriptor.getInstance().getConfig().getPipeMemoryManagementEnabled(); + } + + private static void setPipeMemoryManagementEnabled(final boolean enabled) { + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(enabled); + } + } +} From a8d003cf41955a6fbde84a38300490f45cc0d9c1 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 21:45:05 +0800 Subject: [PATCH 2/2] Update LoadTreeStatementDataTypeConvertExecutionVisitorTest.java --- ...ntDataTypeConvertExecutionVisitorTest.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java index 5cd104dfc3a08..7cfcfb27d8884 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java @@ -108,6 +108,67 @@ public void testFallbackToQueryForRemainingDevicesWhenScanParserHitsCorruption() Assert.assertEquals(loadedPointCountBeforeCorruption, loadedPointCountAfterFallback); } + @Test + public void testFallbackToQueryWhenFirstNonAlignedDeviceIsCorrupted() throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted-first-non-aligned-device.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_0, "s0"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(DEVICE_0 + ".s0", 0) < ROW_COUNT_PER_DEVICE); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_0, 1); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 1); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1); + } + + @Test + public void testFallbackDoesNotReloadCompletedMeasurementsOfCurrentNonAlignedDevice() + throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted-current-non-aligned-device.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_1, "s1"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0); + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(DEVICE_1 + ".s1", 0) < ROW_COUNT_PER_DEVICE); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1); + } + @Test public void testFallbackToQueryForRemainingMeasurementsOfCurrentAlignedDevice() throws Exception { CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);