From ade25db4c1064cd81caed53a43d9e78fbe2f0dc1 Mon Sep 17 00:00:00 2001 From: shizy Date: Thu, 30 Apr 2026 14:33:44 +0800 Subject: [PATCH 1/5] clone partial columns of AlignedTVList during query --- .../fragment/FragmentInstanceContext.java | 21 +++++ .../utils/ResourceByPathUtils.java | 45 +++++++++-- .../db/utils/datastructure/AlignedTVList.java | 79 ++++++++++++------- 3 files changed, 110 insertions(+), 35 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a224e5b2c3586..1e09c6ab4edeb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -70,6 +70,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -155,6 +156,9 @@ public class FragmentInstanceContext extends QueryContext { private long closedUnseqFileNum = 0; private boolean highestPriority = false; + // accessed value columns on each referenced AlignedTVList. + private final Map> alignedTVListColumnAccessMap = new ConcurrentHashMap<>(); + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { FragmentInstanceContext instanceContext = @@ -205,6 +209,23 @@ public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { this.queryDataSourceType = queryDataSourceType; } + public void putAccessedColumns(TVList tvList, List columnIndexList) { + Set accessedColumns = + alignedTVListColumnAccessMap.computeIfAbsent(tvList, ignored -> new HashSet<>()); + for (Integer columnIndex : columnIndexList) { + if (columnIndex != null && columnIndex >= 0) { + accessedColumns.add(columnIndex); + } + } + } + + public Set getAccessedAlignedColumns(TVList tvList) { + Set accessedColumns = alignedTVListColumnAccessMap.get(tvList); + return accessedColumns == null + ? Collections.emptySet() + : Collections.unmodifiableSet(accessedColumns); + } + @TestOnly public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index c64d32c1e327b..9c82259481090 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.tsfile.enums.TSDataType; @@ -63,9 +64,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER; @@ -121,7 +124,8 @@ protected Map prepareTvListMapForQuery( QueryContext context, IWritableMemChunk memChunk, boolean isWorkMemTable, - Filter globalTimeFilter) { + Filter globalTimeFilter, + List columnIndexList) { // should copy globalTimeFilter because GroupByMonthFilter is stateful Filter copyTimeFilter = null; if (globalTimeFilter != null) { @@ -163,6 +167,11 @@ protected Map prepareTvListMapForQuery( list.getQueryContextSet().add(context); tvListQueryMap.put(list, list.rowCount()); } else { + // columnIndexList is only provided for AlignedTVList to track column-level access. + // For TVList (primitive timeseries), it remains null and column tracking is not needed. + if (columnIndexList != null && context instanceof FragmentInstanceContext) { + ((FragmentInstanceContext) context).putAccessedColumns(list, columnIndexList); + } if (list.isSorted() || list.getQueryContextSet().isEmpty()) { LOGGER.debug( "Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it"); @@ -196,7 +205,11 @@ protected Map prepareTvListMapForQuery( list.setOwnerQuery(firstQuery); // clone TVList - cloneList = list.clone(); + cloneList = + columnIndexList == null + ? list.clone() + : ((AlignedTVList) list).clone(this.getAccessedColumnsForQuery(list)); + cloneList.getQueryContextSet().add(context); tvListQueryMap.put(cloneList, cloneList.rowCount()); } @@ -218,6 +231,10 @@ protected Map prepareTvListMapForQuery( } return tvListQueryMap; } + + protected Set getAccessedColumnsForQuery(TVList tvList) { + return null; + } } class AlignedResourceByPathUtils extends ResourceByPathUtils { @@ -364,15 +381,15 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( return null; } - // prepare AlignedTVList for query. It should clone TVList if necessary. - Map alignedTvListQueryMap = - prepareTvListMapForQuery( - context, alignedMemChunk, modsToMemtable == null, globalTimeFilter); - // column index list for the query List columnIndexList = alignedMemChunk.buildColumnIndexList(partialPath.getSchemaList()); + // prepare AlignedTVList for query. It should clone TVList if necessary. + Map alignedTvListQueryMap = + prepareTvListMapForQuery( + context, alignedMemChunk, modsToMemtable == null, globalTimeFilter, columnIndexList); + List> deletionList = null; if (modsToMemtable != null) { deletionList = @@ -383,6 +400,18 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( context, columnIndexList, getMeasurementSchema(), alignedTvListQueryMap, deletionList); } + @Override + protected Set getAccessedColumnsForQuery(TVList tvList) { + Set accessedColumns = new HashSet<>(); + for (QueryContext queryContext : tvList.getQueryContextSet()) { + if (queryContext instanceof FragmentInstanceContext) { + accessedColumns.addAll( + ((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList)); + } + } + return accessedColumns; + } + public VectorMeasurementSchema getMeasurementSchema() { List measurementList = partialPath.getMeasurementList(); TSDataType[] types = new TSDataType[measurementList.size()]; @@ -535,7 +564,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement()); // prepare TVList for query. It should clone TVList if necessary. Map tvListQueryMap = - prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter); + prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter, null); List deletionList = null; if (modsToMemtable != null) { deletionList = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 615bfcaf4cab0..4bacaf3a9bbe3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -52,6 +52,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -141,32 +142,14 @@ public TVList getTvListByColumnIndex(List columnIndex, List public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); cloneAs(cloneList); - System.arraycopy( - memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); - for (int i = 0; i < values.size(); i++) { - // Clone value - List columnValues = values.get(i); - for (Object valueArray : columnValues) { - cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); - } - // Clone bitmap in columnIndex - if (bitMaps != null && bitMaps.get(i) != null) { - List columnBitMaps = bitMaps.get(i); - if (cloneList.bitMaps == null) { - cloneList.bitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - cloneList.bitMaps.add(null); - } - } - if (cloneList.bitMaps.get(i) == null) { - List cloneColumnBitMaps = new ArrayList<>(); - for (BitMap bitMap : columnBitMaps) { - cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); - } - cloneList.bitMaps.set(i, cloneColumnBitMaps); - } - } - } + cloneColumnDataTo(cloneList, null); + return cloneList; + } + + public synchronized AlignedTVList clone(Set columnsToClone) { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneColumnDataTo(cloneList, columnsToClone); return cloneList; } @@ -625,6 +608,48 @@ protected Object cloneValue(TSDataType type, Object value) { } } + private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToClone) { + boolean cloneAllColumns = columnsToClone == null; + System.arraycopy( + memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); + for (int i = 0; i < values.size(); i++) { + List columnValues = values.get(i); + boolean shouldCloneColumn = cloneAllColumns || columnsToClone.contains(i); + if (!shouldCloneColumn) { + cloneList.values.set(i, columnValues); + values.set(i, null); + if (bitMaps != null && bitMaps.get(i) != null) { + ensureBitMapsInitialized(cloneList); + cloneList.bitMaps.set(i, bitMaps.get(i)); + bitMaps.set(i, null); + } + memoryBinaryChunkSize[i] = 0; + continue; + } + + for (Object valueArray : columnValues) { + cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); + } + if (bitMaps != null && bitMaps.get(i) != null) { + ensureBitMapsInitialized(cloneList); + List cloneColumnBitMaps = new ArrayList<>(); + for (BitMap bitMap : bitMaps.get(i)) { + cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); + } + cloneList.bitMaps.set(i, cloneColumnBitMaps); + } + } + } + + private void ensureBitMapsInitialized(AlignedTVList cloneList) { + if (cloneList.bitMaps == null) { + cloneList.bitMaps = new ArrayList<>(dataTypes.size()); + for (int i = 0; i < dataTypes.size(); i++) { + cloneList.bitMaps.add(null); + } + } + } + @Override protected void clearValue() { for (int i = 0; i < dataTypes.size(); i++) { @@ -880,7 +905,7 @@ public long alignedTvListArrayMemCost() { // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { TSDataType type = dataTypes.get(column); - if (type != null) { + if (type != null && values.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); if (bitMaps != null && bitMaps.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; From 448418c36a87af73c418897a5ff5d839c69927c8 Mon Sep 17 00:00:00 2001 From: shizy Date: Fri, 1 May 2026 09:46:08 +0800 Subject: [PATCH 2/5] add test case --- .../FragmentInstanceExecutionTest.java | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index 22e5c360ae3c5..68b6c9182f4ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -49,6 +50,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import org.mockito.Mockito; @@ -57,8 +59,11 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -238,6 +243,70 @@ public void testTVListCloneForQuery() { } } + @Test + public void testAlignedTVListPartialColumnClone() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(2, "test-aligned-partial-clone"); + + try { + // Create MemTable with AlignedPath + List schemaList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + schemaList.add(new MeasurementSchema("sensor_" + i, TSDataType.INT64)); + } + String deviceId = "d1"; + IMemTable memTable = createMemTable(deviceId, schemaList); + + // Verify we have unsorted AlignedTVList + assertEquals(1, memTable.getMemTableMap().size()); + IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); + assertEquals(1, memChunkGroup.getMemChunkMap().size()); + IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); + TVList tvList = memChunk.getWorkingTVList(); + assertFalse(tvList.isSorted()); + assertEquals(6424, tvList.calculateRamSize()); + assertEquals(100, tvList.rowCount()); + + // FragmentInstance Context + FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); + FragmentInstanceStateMachine stateMachine1 = + new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); + FragmentInstanceContext context1 = createFragmentInstanceContext(id1, stateMachine1); + + FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); + FragmentInstanceStateMachine stateMachine2 = + new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); + FragmentInstanceContext context2 = createFragmentInstanceContext(id2, stateMachine2); + + // Query 1: sensor_2 and sensor_0 + List measurements1 = Arrays.asList("sensor_2", "sensor_0"); + List schemas1 = Arrays.asList(schemaList.get(2), schemaList.get(0)); + AlignedPath fullPath1 = new AlignedPath(deviceId, measurements1, schemas1); + + ReadOnlyMemChunk readOnlyMemChunk1 = + memTable.query(context1, fullPath1, Long.MIN_VALUE, null, null); + Set accessedColumnsForQuery1 = context1.getAccessedAlignedColumns(tvList); + assertEquals(new HashSet<>(Arrays.asList(0, 2)), accessedColumnsForQuery1); + + // Query 2: sensor_1 and sensor_3 + List measurements2 = Arrays.asList("sensor_1", "sensor_3"); + List schemas2 = Arrays.asList(schemaList.get(1), schemaList.get(3)); + AlignedPath fullPath2 = new AlignedPath(deviceId, measurements2, schemas2); + ReadOnlyMemChunk readOnlyMemChunk2 = + memTable.query(context2, fullPath2, Long.MIN_VALUE, null, null); + + // Only cloned sensor_2 and sensor_0 exist + assertEquals(3352, tvList.calculateRamSize()); + assertEquals(100, tvList.rowCount()); + + } catch (Exception e) { + fail(e.getMessage()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) throws CpuNotEnoughException { IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); @@ -298,4 +367,24 @@ private IMemTable createMemTable(String deviceId, String measurementId) } return memTable; } + + private IMemTable createMemTable(String deviceId, List schemaList) + throws IllegalPathException { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "1"); + + // Insert data in reverse order to make it unsorted + int rows = 100; + for (int i = rows - 1; i >= 0; i--) { + Object[] values = new Object[5]; + for (int j = 0; j < 5; j++) { + values[j] = (long) i * 100 + j; + } + memTable.writeAlignedRow( + DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)), + schemaList, + i, + values); + } + return memTable; + } } From baf49caea804db146c430550b39ef664d14c880f Mon Sep 17 00:00:00 2001 From: shizy Date: Tue, 12 May 2026 14:42:26 +0800 Subject: [PATCH 3/5] some fix and comments --- .../fragment/FragmentInstanceContext.java | 31 +++++-- .../utils/ResourceByPathUtils.java | 21 +++-- .../db/utils/datastructure/AlignedTVList.java | 81 ++++++++++++------- 3 files changed, 91 insertions(+), 42 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 1e09c6ab4edeb..995ec089d4357 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -68,6 +68,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -209,16 +210,36 @@ public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { this.queryDataSourceType = queryDataSourceType; } + /** + * Record columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held. Even though the HashSet inside + * alignedTVListColumnAccessMap is not thread-safe, the calling pattern guarantees thread safety + * without requiring additional synchronization. + * + * @param tvList the TVList being accessed + * @param columnIndexList list of column indices being accessed + */ public void putAccessedColumns(TVList tvList, List columnIndexList) { Set accessedColumns = alignedTVListColumnAccessMap.computeIfAbsent(tvList, ignored -> new HashSet<>()); - for (Integer columnIndex : columnIndexList) { - if (columnIndex != null && columnIndex >= 0) { - accessedColumns.add(columnIndex); - } - } + columnIndexList.stream() + .filter(Objects::nonNull) + .forEach( + index -> { + if (index >= 0) { + accessedColumns.add(index); + } + }); } + /** + * Get columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring that no other thread can + * change accessed columns for the same TVList concurrently. + * + * @param tvList the TVList being accessed + * @return set of column indices being accessed + */ public Set getAccessedAlignedColumns(TVList tvList) { Set accessedColumns = alignedTVListColumnAccessMap.get(tvList); return accessedColumns == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index 9c82259481090..b1e841a90f974 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -167,8 +167,8 @@ protected Map prepareTvListMapForQuery( list.getQueryContextSet().add(context); tvListQueryMap.put(list, list.rowCount()); } else { - // columnIndexList is only provided for AlignedTVList to track column-level access. - // For TVList (primitive timeseries), it remains null and column tracking is not needed. + // columnIndexList is to track column-level access for AlignedTVList. + // For TVList (primitive time series), it remains null and column tracking is not needed. if (columnIndexList != null && context instanceof FragmentInstanceContext) { ((FragmentInstanceContext) context).putAccessedColumns(list, columnIndexList); } @@ -205,10 +205,12 @@ protected Map prepareTvListMapForQuery( list.setOwnerQuery(firstQuery); // clone TVList - cloneList = - columnIndexList == null - ? list.clone() - : ((AlignedTVList) list).clone(this.getAccessedColumnsForQuery(list)); + Set columnsToClone = getAccessedColumnsForQuery(list); + if (columnsToClone == null) { + cloneList = list.clone(); + } else { + cloneList = ((AlignedTVList) list).clone(columnsToClone); + } cloneList.getQueryContextSet().add(context); tvListQueryMap.put(cloneList, cloneList.rowCount()); @@ -400,6 +402,13 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( context, columnIndexList, getMeasurementSchema(), alignedTvListQueryMap, deletionList); } + /** + * This method is called from prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring + * thread-safe access to queryContextSet. + * + * @param tvList the TVList to get accessed columns for + * @return set of accessed column indices, or empty set if no columns are tracked + */ @Override protected Set getAccessedColumnsForQuery(TVList tvList) { Set accessedColumns = new HashSet<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 4bacaf3a9bbe3..edfe7dbad2997 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -164,10 +164,13 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) { timestamps.get(arrayIndex)[elementIndex] = timestamp; for (int i = 0; i < values.size(); i++) { Object columnValue = value[i]; - List columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); } + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -319,13 +322,7 @@ private TsPrimitiveType getAlignedValueByValueIndex( } public void extendColumn(TSDataType dataType) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(values.size()); - for (int i = 0; i < values.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; - } + ensureBitMapsInitialized(); List columnValue = new ArrayList<>(); List columnBitMaps = new ArrayList<>(); for (int i = 0; i < timestamps.size(); i++) { @@ -545,16 +542,14 @@ public Pair delete(long lowerBound, long upperBound, int colum } public void deleteColumn(int columnIndex) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return; } + ensureBitMapsInitialized(); if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE)); } bitMaps.set(columnIndex, columnBitMaps); @@ -609,17 +604,20 @@ protected Object cloneValue(TSDataType type, Object value) { } private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToClone) { - boolean cloneAllColumns = columnsToClone == null; + boolean cloneAllColumns = columnsToClone == null || columnsToClone.isEmpty(); System.arraycopy( memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); for (int i = 0; i < values.size(); i++) { List columnValues = values.get(i); + if (columnValues == null) { + continue; + } boolean shouldCloneColumn = cloneAllColumns || columnsToClone.contains(i); if (!shouldCloneColumn) { cloneList.values.set(i, columnValues); values.set(i, null); if (bitMaps != null && bitMaps.get(i) != null) { - ensureBitMapsInitialized(cloneList); + cloneList.ensureBitMapsInitialized(); cloneList.bitMaps.set(i, bitMaps.get(i)); bitMaps.set(i, null); } @@ -631,7 +629,7 @@ private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToCl cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); } if (bitMaps != null && bitMaps.get(i) != null) { - ensureBitMapsInitialized(cloneList); + cloneList.ensureBitMapsInitialized(); List cloneColumnBitMaps = new ArrayList<>(); for (BitMap bitMap : bitMaps.get(i)) { cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); @@ -641,11 +639,24 @@ private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToCl } } - private void ensureBitMapsInitialized(AlignedTVList cloneList) { - if (cloneList.bitMaps == null) { - cloneList.bitMaps = new ArrayList<>(dataTypes.size()); + /** + * Ensure the bitMaps list is initialized. This method is NOT thread-safe and relies on the caller + * to provide synchronization: + * + *
    + * - Called from synchronized clone methods, which is invoked by query/flush operations. + *
+ * + *
    + * - Called from getBitMap, which is invoked by write/delete operations (putAlignedValue, + * putAlignedValues, extendColumn, deleteColumn). DataRegion write lock is held at this moment. + *
+ */ + private void ensureBitMapsInitialized() { + if (bitMaps == null) { + bitMaps = new ArrayList<>(dataTypes.size()); for (int i = 0; i < dataTypes.size(); i++) { - cloneList.bitMaps.add(null); + bitMaps.add(null); } } } @@ -682,7 +693,11 @@ protected void expandValues() { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } for (int i = 0; i < dataTypes.size(); i++) { - values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } + columnValues.add(getPrimitiveArraysByType(dataTypes.get(i))); if (bitMaps != null && bitMaps.get(i) != null) { bitMaps.get(i).add(null); } @@ -782,6 +797,9 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex continue; } List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -824,19 +842,17 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex } private BitMap getBitMap(int columnIndex, int arrayIndex) { - // init BitMaps if doesn't have - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int i = 0; i < dataTypes.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return null; } + // init BitMaps if necessary + ensureBitMapsInitialized(); // if the bitmap in columnIndex is null, init the bitmap of this column from the beginning if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE, new byte[ARRAY_SIZE])); } bitMaps.set(columnIndex, columnBitMaps); @@ -1131,6 +1147,9 @@ public void serializeToWAL(IWALByteBufferView buffer) { // serialize value and bitmap by column for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) { List columnValues = values.get(columnIndex); + if (columnValues == null) { + continue; + } for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) { int arrayIndex = rowIndex / ARRAY_SIZE; int elementIndex = rowIndex % ARRAY_SIZE; From 90242978eb69d58c52a93a88c8ac678878abc7b6 Mon Sep 17 00:00:00 2001 From: shizy Date: Tue, 12 May 2026 15:28:58 +0800 Subject: [PATCH 4/5] fix ut test --- .../execution/fragment/FragmentInstanceExecutionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index 68b6c9182f4ae..b196ef473c2f7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -265,7 +265,7 @@ public void testAlignedTVListPartialColumnClone() { IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); TVList tvList = memChunk.getWorkingTVList(); assertFalse(tvList.isSorted()); - assertEquals(6424, tvList.calculateRamSize()); + assertEquals(6424, tvList.calculateRamSize().getRamSize()); assertEquals(100, tvList.rowCount()); // FragmentInstance Context @@ -297,7 +297,7 @@ public void testAlignedTVListPartialColumnClone() { memTable.query(context2, fullPath2, Long.MIN_VALUE, null, null); // Only cloned sensor_2 and sensor_0 exist - assertEquals(3352, tvList.calculateRamSize()); + assertEquals(3352, tvList.calculateRamSize().getRamSize()); assertEquals(100, tvList.rowCount()); } catch (Exception e) { From 58dcfc8f9e847d99f466e5a5deb1b5144ab42622 Mon Sep 17 00:00:00 2001 From: shizy Date: Wed, 13 May 2026 15:39:13 +0800 Subject: [PATCH 5/5] alignedTvListArrayMemCost for cloned list --- .../utils/ResourceByPathUtils.java | 33 +++++++++++-------- .../db/utils/datastructure/AlignedTVList.java | 19 +++++++++-- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index b1e841a90f974..b362b367ef1ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -153,7 +153,12 @@ protected Map prepareTvListMapForQuery( // mutable tvlist TVList list = memChunk.getWorkingTVList(); TVList cloneList = null; - TVList.RamInfo listRamInfo = list.calculateRamSize(); + Set columnsToClone = getAccessedColumnsForQuery(list); + TVList.RamInfo listRamInfo = + (columnsToClone == null) + ? list.calculateRamSize() + : ((AlignedTVList) list).calculateRamSize(columnsToClone); + list.lockQueryList(); try { if (copyTimeFilter != null @@ -205,12 +210,10 @@ protected Map prepareTvListMapForQuery( list.setOwnerQuery(firstQuery); // clone TVList - Set columnsToClone = getAccessedColumnsForQuery(list); - if (columnsToClone == null) { - cloneList = list.clone(); - } else { - cloneList = ((AlignedTVList) list).clone(columnsToClone); - } + cloneList = + (columnsToClone == null) + ? list.clone() + : ((AlignedTVList) list).clone(columnsToClone); cloneList.getQueryContextSet().add(context); tvListQueryMap.put(cloneList, cloneList.rowCount()); @@ -403,20 +406,22 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( } /** - * This method is called from prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring - * thread-safe access to queryContextSet. - * * @param tvList the TVList to get accessed columns for * @return set of accessed column indices, or empty set if no columns are tracked */ @Override protected Set getAccessedColumnsForQuery(TVList tvList) { Set accessedColumns = new HashSet<>(); - for (QueryContext queryContext : tvList.getQueryContextSet()) { - if (queryContext instanceof FragmentInstanceContext) { - accessedColumns.addAll( - ((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList)); + tvList.lockQueryList(); + try { + for (QueryContext queryContext : tvList.getQueryContextSet()) { + if (queryContext instanceof FragmentInstanceContext) { + accessedColumns.addAll( + ((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList)); + } } + } finally { + tvList.unlockQueryList(); } return accessedColumns; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index edfe7dbad2997..25b4fbc381b72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -604,7 +604,7 @@ protected Object cloneValue(TSDataType type, Object value) { } private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToClone) { - boolean cloneAllColumns = columnsToClone == null || columnsToClone.isEmpty(); + boolean cloneAllColumns = columnsToClone == null; System.arraycopy( memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); for (int i = 0; i < values.size(); i++) { @@ -882,6 +882,14 @@ public synchronized RamInfo calculateRamSize() { timestamps.size(), alignedTvListArrayMemCost(), rowCount, new ArrayList<>(dataTypes)); } + public synchronized RamInfo calculateRamSize(Set columnsToClone) { + return new RamInfo( + timestamps.size(), + alignedTvListArrayMemCost(columnsToClone), + rowCount, + new ArrayList<>(dataTypes)); + } + /** * Get the single alignedTVList array mem cost by give types. * @@ -916,10 +924,13 @@ public static long alignedTvListArrayMemCost(TSDataType[] types) { * * @return AlignedTvListArrayMemSize */ - public long alignedTvListArrayMemCost() { + public long alignedTvListArrayMemCost(Set columnsToClone) { long size = 0; // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { + if (columnsToClone != null && !columnsToClone.contains(column)) { + continue; + } TSDataType type = dataTypes.get(column); if (type != null && values.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); @@ -943,6 +954,10 @@ public long alignedTvListArrayMemCost() { return size; } + public long alignedTvListArrayMemCost() { + return alignedTvListArrayMemCost((Set) null); + } + /** * Get the single column array mem cost by give type. *