diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a224e5b2c358..995ec089d435 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -68,8 +68,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -155,6 +157,9 @@ public class FragmentInstanceContext extends QueryContext { private long closedUnseqFileNum = 0; private boolean highestPriority = false; + // accessed value columns on each referenced AlignedTVList. + private final Map> alignedTVListColumnAccessMap = new ConcurrentHashMap<>(); + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { FragmentInstanceContext instanceContext = @@ -205,6 +210,43 @@ public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { this.queryDataSourceType = queryDataSourceType; } + /** + * Record columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held. Even though the HashSet inside + * alignedTVListColumnAccessMap is not thread-safe, the calling pattern guarantees thread safety + * without requiring additional synchronization. + * + * @param tvList the TVList being accessed + * @param columnIndexList list of column indices being accessed + */ + public void putAccessedColumns(TVList tvList, List columnIndexList) { + Set accessedColumns = + alignedTVListColumnAccessMap.computeIfAbsent(tvList, ignored -> new HashSet<>()); + columnIndexList.stream() + .filter(Objects::nonNull) + .forEach( + index -> { + if (index >= 0) { + accessedColumns.add(index); + } + }); + } + + /** + * Get columns of the AlignedTVList accessed by the query. This method is called from + * prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring that no other thread can + * change accessed columns for the same TVList concurrently. + * + * @param tvList the TVList being accessed + * @return set of column indices being accessed + */ + public Set getAccessedAlignedColumns(TVList tvList) { + Set accessedColumns = alignedTVListColumnAccessMap.get(tvList); + return accessedColumns == null + ? Collections.emptySet() + : Collections.unmodifiableSet(accessedColumns); + } + @TestOnly public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index c64d32c1e327..b362b367ef1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.tsfile.enums.TSDataType; @@ -63,9 +64,11 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER; @@ -121,7 +124,8 @@ protected Map prepareTvListMapForQuery( QueryContext context, IWritableMemChunk memChunk, boolean isWorkMemTable, - Filter globalTimeFilter) { + Filter globalTimeFilter, + List columnIndexList) { // should copy globalTimeFilter because GroupByMonthFilter is stateful Filter copyTimeFilter = null; if (globalTimeFilter != null) { @@ -149,7 +153,12 @@ protected Map prepareTvListMapForQuery( // mutable tvlist TVList list = memChunk.getWorkingTVList(); TVList cloneList = null; - TVList.RamInfo listRamInfo = list.calculateRamSize(); + Set columnsToClone = getAccessedColumnsForQuery(list); + TVList.RamInfo listRamInfo = + (columnsToClone == null) + ? list.calculateRamSize() + : ((AlignedTVList) list).calculateRamSize(columnsToClone); + list.lockQueryList(); try { if (copyTimeFilter != null @@ -163,6 +172,11 @@ protected Map prepareTvListMapForQuery( list.getQueryContextSet().add(context); tvListQueryMap.put(list, list.rowCount()); } else { + // columnIndexList is to track column-level access for AlignedTVList. + // For TVList (primitive time series), it remains null and column tracking is not needed. + if (columnIndexList != null && context instanceof FragmentInstanceContext) { + ((FragmentInstanceContext) context).putAccessedColumns(list, columnIndexList); + } if (list.isSorted() || list.getQueryContextSet().isEmpty()) { LOGGER.debug( "Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it"); @@ -196,7 +210,11 @@ protected Map prepareTvListMapForQuery( list.setOwnerQuery(firstQuery); // clone TVList - cloneList = list.clone(); + cloneList = + (columnsToClone == null) + ? list.clone() + : ((AlignedTVList) list).clone(columnsToClone); + cloneList.getQueryContextSet().add(context); tvListQueryMap.put(cloneList, cloneList.rowCount()); } @@ -218,6 +236,10 @@ protected Map prepareTvListMapForQuery( } return tvListQueryMap; } + + protected Set getAccessedColumnsForQuery(TVList tvList) { + return null; + } } class AlignedResourceByPathUtils extends ResourceByPathUtils { @@ -364,15 +386,15 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( return null; } - // prepare AlignedTVList for query. It should clone TVList if necessary. - Map alignedTvListQueryMap = - prepareTvListMapForQuery( - context, alignedMemChunk, modsToMemtable == null, globalTimeFilter); - // column index list for the query List columnIndexList = alignedMemChunk.buildColumnIndexList(partialPath.getSchemaList()); + // prepare AlignedTVList for query. It should clone TVList if necessary. + Map alignedTvListQueryMap = + prepareTvListMapForQuery( + context, alignedMemChunk, modsToMemtable == null, globalTimeFilter, columnIndexList); + List> deletionList = null; if (modsToMemtable != null) { deletionList = @@ -383,6 +405,27 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( context, columnIndexList, getMeasurementSchema(), alignedTvListQueryMap, deletionList); } + /** + * @param tvList the TVList to get accessed columns for + * @return set of accessed column indices, or empty set if no columns are tracked + */ + @Override + protected Set getAccessedColumnsForQuery(TVList tvList) { + Set accessedColumns = new HashSet<>(); + tvList.lockQueryList(); + try { + for (QueryContext queryContext : tvList.getQueryContextSet()) { + if (queryContext instanceof FragmentInstanceContext) { + accessedColumns.addAll( + ((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList)); + } + } + } finally { + tvList.unlockQueryList(); + } + return accessedColumns; + } + public VectorMeasurementSchema getMeasurementSchema() { List measurementList = partialPath.getMeasurementList(); TSDataType[] types = new TSDataType[measurementList.size()]; @@ -535,7 +578,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable( memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement()); // prepare TVList for query. It should clone TVList if necessary. Map tvListQueryMap = - prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter); + prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter, null); List deletionList = null; if (modsToMemtable != null) { deletionList = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 615bfcaf4cab..25b4fbc381b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -52,6 +52,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -141,32 +142,14 @@ public TVList getTvListByColumnIndex(List columnIndex, List public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); cloneAs(cloneList); - System.arraycopy( - memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); - for (int i = 0; i < values.size(); i++) { - // Clone value - List columnValues = values.get(i); - for (Object valueArray : columnValues) { - cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); - } - // Clone bitmap in columnIndex - if (bitMaps != null && bitMaps.get(i) != null) { - List columnBitMaps = bitMaps.get(i); - if (cloneList.bitMaps == null) { - cloneList.bitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - cloneList.bitMaps.add(null); - } - } - if (cloneList.bitMaps.get(i) == null) { - List cloneColumnBitMaps = new ArrayList<>(); - for (BitMap bitMap : columnBitMaps) { - cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); - } - cloneList.bitMaps.set(i, cloneColumnBitMaps); - } - } - } + cloneColumnDataTo(cloneList, null); + return cloneList; + } + + public synchronized AlignedTVList clone(Set columnsToClone) { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneColumnDataTo(cloneList, columnsToClone); return cloneList; } @@ -181,10 +164,13 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) { timestamps.get(arrayIndex)[elementIndex] = timestamp; for (int i = 0; i < values.size(); i++) { Object columnValue = value[i]; - List columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); } + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -336,13 +322,7 @@ private TsPrimitiveType getAlignedValueByValueIndex( } public void extendColumn(TSDataType dataType) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(values.size()); - for (int i = 0; i < values.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; - } + ensureBitMapsInitialized(); List columnValue = new ArrayList<>(); List columnBitMaps = new ArrayList<>(); for (int i = 0; i < timestamps.size(); i++) { @@ -562,16 +542,14 @@ public Pair delete(long lowerBound, long upperBound, int colum } public void deleteColumn(int columnIndex) { - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int j = 0; j < dataTypes.size(); j++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return; } + ensureBitMapsInitialized(); if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE)); } bitMaps.set(columnIndex, columnBitMaps); @@ -625,6 +603,64 @@ protected Object cloneValue(TSDataType type, Object value) { } } + private void cloneColumnDataTo(AlignedTVList cloneList, Set columnsToClone) { + boolean cloneAllColumns = columnsToClone == null; + System.arraycopy( + memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size()); + for (int i = 0; i < values.size(); i++) { + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } + boolean shouldCloneColumn = cloneAllColumns || columnsToClone.contains(i); + if (!shouldCloneColumn) { + cloneList.values.set(i, columnValues); + values.set(i, null); + if (bitMaps != null && bitMaps.get(i) != null) { + cloneList.ensureBitMapsInitialized(); + cloneList.bitMaps.set(i, bitMaps.get(i)); + bitMaps.set(i, null); + } + memoryBinaryChunkSize[i] = 0; + continue; + } + + for (Object valueArray : columnValues) { + cloneList.values.get(i).add(cloneValue(dataTypes.get(i), valueArray)); + } + if (bitMaps != null && bitMaps.get(i) != null) { + cloneList.ensureBitMapsInitialized(); + List cloneColumnBitMaps = new ArrayList<>(); + for (BitMap bitMap : bitMaps.get(i)) { + cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); + } + cloneList.bitMaps.set(i, cloneColumnBitMaps); + } + } + } + + /** + * Ensure the bitMaps list is initialized. This method is NOT thread-safe and relies on the caller + * to provide synchronization: + * + *
    + * - Called from synchronized clone methods, which is invoked by query/flush operations. + *
+ * + *
    + * - Called from getBitMap, which is invoked by write/delete operations (putAlignedValue, + * putAlignedValues, extendColumn, deleteColumn). DataRegion write lock is held at this moment. + *
+ */ + private void ensureBitMapsInitialized() { + if (bitMaps == null) { + bitMaps = new ArrayList<>(dataTypes.size()); + for (int i = 0; i < dataTypes.size(); i++) { + bitMaps.add(null); + } + } + } + @Override protected void clearValue() { for (int i = 0; i < dataTypes.size(); i++) { @@ -657,7 +693,11 @@ protected void expandValues() { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } for (int i = 0; i < dataTypes.size(); i++) { - values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + List columnValues = values.get(i); + if (columnValues == null) { + continue; + } + columnValues.add(getPrimitiveArraysByType(dataTypes.get(i))); if (bitMaps != null && bitMaps.get(i) != null) { bitMaps.get(i).add(null); } @@ -757,6 +797,9 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex continue; } List columnValues = values.get(i); + if (columnValues == null) { + continue; + } switch (dataTypes.get(i)) { case TEXT: case BLOB: @@ -799,19 +842,17 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex } private BitMap getBitMap(int columnIndex, int arrayIndex) { - // init BitMaps if doesn't have - if (bitMaps == null) { - List> localBitMaps = new ArrayList<>(dataTypes.size()); - for (int i = 0; i < dataTypes.size(); i++) { - localBitMaps.add(null); - } - bitMaps = localBitMaps; + List columnValues = values.get(columnIndex); + if (columnValues == null) { + return null; } + // init BitMaps if necessary + ensureBitMapsInitialized(); // if the bitmap in columnIndex is null, init the bitmap of this column from the beginning if (bitMaps.get(columnIndex) == null) { List columnBitMaps = new ArrayList<>(); - for (int i = 0; i < values.get(columnIndex).size(); i++) { + for (int i = 0; i < columnValues.size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE, new byte[ARRAY_SIZE])); } bitMaps.set(columnIndex, columnBitMaps); @@ -841,6 +882,14 @@ public synchronized RamInfo calculateRamSize() { timestamps.size(), alignedTvListArrayMemCost(), rowCount, new ArrayList<>(dataTypes)); } + public synchronized RamInfo calculateRamSize(Set columnsToClone) { + return new RamInfo( + timestamps.size(), + alignedTvListArrayMemCost(columnsToClone), + rowCount, + new ArrayList<>(dataTypes)); + } + /** * Get the single alignedTVList array mem cost by give types. * @@ -875,12 +924,15 @@ public static long alignedTvListArrayMemCost(TSDataType[] types) { * * @return AlignedTvListArrayMemSize */ - public long alignedTvListArrayMemCost() { + public long alignedTvListArrayMemCost(Set columnsToClone) { long size = 0; // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { + if (columnsToClone != null && !columnsToClone.contains(column)) { + continue; + } TSDataType type = dataTypes.get(column); - if (type != null) { + if (type != null && values.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); if (bitMaps != null && bitMaps.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; @@ -902,6 +954,10 @@ public long alignedTvListArrayMemCost() { return size; } + public long alignedTvListArrayMemCost() { + return alignedTvListArrayMemCost((Set) null); + } + /** * Get the single column array mem cost by give type. * @@ -1106,6 +1162,9 @@ public void serializeToWAL(IWALByteBufferView buffer) { // serialize value and bitmap by column for (int columnIndex = 0; columnIndex < values.size(); columnIndex++) { List columnValues = values.get(columnIndex); + if (columnValues == null) { + continue; + } for (int rowIndex = 0; rowIndex < rowCount; ++rowIndex) { int arrayIndex = rowIndex / ARRAY_SIZE; int elementIndex = rowIndex % ARRAY_SIZE; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index 22e5c360ae3c..b196ef473c2f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -49,6 +50,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import org.mockito.Mockito; @@ -57,8 +59,11 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -238,6 +243,70 @@ public void testTVListCloneForQuery() { } } + @Test + public void testAlignedTVListPartialColumnClone() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(2, "test-aligned-partial-clone"); + + try { + // Create MemTable with AlignedPath + List schemaList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + schemaList.add(new MeasurementSchema("sensor_" + i, TSDataType.INT64)); + } + String deviceId = "d1"; + IMemTable memTable = createMemTable(deviceId, schemaList); + + // Verify we have unsorted AlignedTVList + assertEquals(1, memTable.getMemTableMap().size()); + IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); + assertEquals(1, memChunkGroup.getMemChunkMap().size()); + IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); + TVList tvList = memChunk.getWorkingTVList(); + assertFalse(tvList.isSorted()); + assertEquals(6424, tvList.calculateRamSize().getRamSize()); + assertEquals(100, tvList.rowCount()); + + // FragmentInstance Context + FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); + FragmentInstanceStateMachine stateMachine1 = + new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); + FragmentInstanceContext context1 = createFragmentInstanceContext(id1, stateMachine1); + + FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); + FragmentInstanceStateMachine stateMachine2 = + new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); + FragmentInstanceContext context2 = createFragmentInstanceContext(id2, stateMachine2); + + // Query 1: sensor_2 and sensor_0 + List measurements1 = Arrays.asList("sensor_2", "sensor_0"); + List schemas1 = Arrays.asList(schemaList.get(2), schemaList.get(0)); + AlignedPath fullPath1 = new AlignedPath(deviceId, measurements1, schemas1); + + ReadOnlyMemChunk readOnlyMemChunk1 = + memTable.query(context1, fullPath1, Long.MIN_VALUE, null, null); + Set accessedColumnsForQuery1 = context1.getAccessedAlignedColumns(tvList); + assertEquals(new HashSet<>(Arrays.asList(0, 2)), accessedColumnsForQuery1); + + // Query 2: sensor_1 and sensor_3 + List measurements2 = Arrays.asList("sensor_1", "sensor_3"); + List schemas2 = Arrays.asList(schemaList.get(1), schemaList.get(3)); + AlignedPath fullPath2 = new AlignedPath(deviceId, measurements2, schemas2); + ReadOnlyMemChunk readOnlyMemChunk2 = + memTable.query(context2, fullPath2, Long.MIN_VALUE, null, null); + + // Only cloned sensor_2 and sensor_0 exist + assertEquals(3352, tvList.calculateRamSize().getRamSize()); + assertEquals(100, tvList.rowCount()); + + } catch (Exception e) { + fail(e.getMessage()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) throws CpuNotEnoughException { IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); @@ -298,4 +367,24 @@ private IMemTable createMemTable(String deviceId, String measurementId) } return memTable; } + + private IMemTable createMemTable(String deviceId, List schemaList) + throws IllegalPathException { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "1"); + + // Insert data in reverse order to make it unsorted + int rows = 100; + for (int i = rows - 1; i >= 0; i--) { + Object[] values = new Object[5]; + for (int j = 0; j < 5; j++) { + values[j] = (long) i * 100 + j; + } + memTable.writeAlignedRow( + DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)), + schemaList, + i, + values); + } + return memTable; + } }