diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java index 43064ecdaa9f..0b07e1556aaa 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBJDBCDataSet.java @@ -127,20 +127,15 @@ public IoTDBJDBCDataSet( // deduplicate and map if (columnNameIndex != null) { - int deduplicatedColumnSize = (int) columnNameIndex.values().stream().distinct().count(); - this.columnTypeDeduplicatedList = new ArrayList<>(deduplicatedColumnSize); - for (int i = 0; i < deduplicatedColumnSize; i++) { - columnTypeDeduplicatedList.add(null); - } + this.columnTypeDeduplicatedList = + initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex)); for (int i = 0; i < columnNameList.size(); i++) { String name = columnNameList.get(i); this.columnNameList.add(name); this.columnTypeList.add(columnTypeList.get(i)); if (!columnOrdinalMap.containsKey(name)) { int index = columnNameIndex.get(name); - if (!columnOrdinalMap.containsValue(index + START_INDEX)) { - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); - } + setColumnTypeIfAbsent(columnTypeDeduplicatedList, index, columnTypeList.get(i)); columnOrdinalMap.put(name, index + START_INDEX); } } @@ -242,11 +237,8 @@ public IoTDBJDBCDataSet( // deduplicate and map if (columnNameIndex != null) { - int deduplicatedColumnSize = (int) columnNameIndex.values().stream().distinct().count(); - this.columnTypeDeduplicatedList = new ArrayList<>(deduplicatedColumnSize); - for (int i = 0; i < deduplicatedColumnSize; i++) { - columnTypeDeduplicatedList.add(null); - } + this.columnTypeDeduplicatedList = + initDeduplicatedColumnTypes(getDeduplicatedColumnSize(columnNameIndex)); for (int i = 0; i < columnNameList.size(); i++) { String name = ""; if (sgList != null @@ -262,9 +254,7 @@ public IoTDBJDBCDataSet( // "Time".equals(name) -> to allow the Time column appear in value columns if (!columnOrdinalMap.containsKey(name) || "Time".equals(name)) { int index = columnNameIndex.get(name); - if (!columnOrdinalMap.containsValue(index + START_INDEX)) { - columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i))); - } + setColumnTypeIfAbsent(columnTypeDeduplicatedList, index, columnTypeList.get(i)); columnOrdinalMap.put(name, index + START_INDEX); } } @@ -320,6 +310,31 @@ public IoTDBJDBCDataSet( this.emptyResultSet = (queryDataSet == null || !queryDataSet.time.hasRemaining()); } + private static int getDeduplicatedColumnSize(Map columnNameIndex) { + int deduplicatedColumnSize = 0; + for (Integer index : columnNameIndex.values()) { + if (index != null && index + 1 > deduplicatedColumnSize) { + deduplicatedColumnSize = index + 1; + } + } + return deduplicatedColumnSize; + } + + private static List initDeduplicatedColumnTypes(int deduplicatedColumnSize) { + List columnTypes = new ArrayList<>(deduplicatedColumnSize); + for (int i = 0; i < deduplicatedColumnSize; i++) { + columnTypes.add(null); + } + return columnTypes; + } + + private static void setColumnTypeIfAbsent( + List columnTypeDeduplicatedList, int index, String columnType) { + if (columnTypeDeduplicatedList.get(index) == null) { + columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnType)); + } + } + public void close() throws StatementExecutionException, TException { if (isClosed) { return; diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java index 51a23a6af4c6..3dbf156ef781 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/stmt/PreparedParameterSerde.java @@ -34,6 +34,8 @@ /** Serializer for PreparedStatement parameters. */ public class PreparedParameterSerde { + private static final char[] HEX_DIGITS = "0123456789ABCDEF".toCharArray(); + public static class DeserializedParam { public final TSDataType type; public final Object value; @@ -166,10 +168,13 @@ private static Object deserializeValue(ByteBuffer buffer, TSDataType type) { /** Convert byte array to hexadecimal string representation. */ public static String bytesToHex(byte[] bytes) { - StringBuilder sb = new StringBuilder(bytes.length * 2); - for (byte b : bytes) { - sb.append(String.format("%02X", b)); + char[] chars = new char[bytes.length * 2]; + for (int i = 0; i < bytes.length; i++) { + int value = bytes[i] & 0xFF; + int index = i * 2; + chars[index] = HEX_DIGITS[value >>> 4]; + chars[index + 1] = HEX_DIGITS[value & 0x0F]; } - return sb.toString(); + return new String(chars); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index c38bff5e9d4a..457465959a86 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -852,31 +852,12 @@ private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq( TSCreateAlignedTimeseriesReq request = new TSCreateAlignedTimeseriesReq(); request.setPrefixPath(prefixPath); request.setMeasurements(measurements); - request.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList())); - request.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList())); - request.setCompressors( - compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList())); - if (measurementAliasList != null) { - measurementAliasList = - measurementAliasList.stream() - .map(value -> value != null ? value : "") - .collect(Collectors.toList()); - } - request.setMeasurementAlias(measurementAliasList); - if (tagsList != null) { - tagsList = - tagsList.stream() - .map(value -> value != null ? value : new HashMap()) - .collect(Collectors.toList()); - } - request.setTagsList(tagsList); - if (attributesList != null) { - attributesList = - attributesList.stream() - .map(value -> value != null ? value : new HashMap()) - .collect(Collectors.toList()); - } - request.setAttributesList(attributesList); + request.setDataTypes(toDataTypeOrdinals(dataTypes)); + request.setEncodings(toEncodingOrdinals(encodings)); + request.setCompressors(toCompressionOrdinals(compressors)); + request.setMeasurementAlias(replaceNullStrings(measurementAliasList)); + request.setTagsList(replaceNullMaps(tagsList)); + request.setAttributesList(replaceNullMaps(attributesList)); return request; } @@ -916,47 +897,14 @@ private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq( TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq(); request.setPaths(paths); - - List dataTypeOrdinals = new ArrayList<>(dataTypes.size()); - for (TSDataType dataType : dataTypes) { - dataTypeOrdinals.add(dataType.ordinal()); - } - request.setDataTypes(dataTypeOrdinals); - - List encodingOrdinals = new ArrayList<>(dataTypes.size()); - for (TSEncoding encoding : encodings) { - encodingOrdinals.add(encoding.ordinal()); - } - request.setEncodings(encodingOrdinals); - - List compressionOrdinals = new ArrayList<>(paths.size()); - for (CompressionType compression : compressors) { - compressionOrdinals.add((int) compression.serialize()); - } - request.setCompressors(compressionOrdinals); + request.setDataTypes(toDataTypeOrdinals(dataTypes)); + request.setEncodings(toEncodingOrdinals(encodings)); + request.setCompressors(toCompressionOrdinals(compressors)); request.setPropsList(propsList); - if (tagsList != null) { - tagsList = - tagsList.stream() - .map(value -> value != null ? value : new HashMap()) - .collect(Collectors.toList()); - } - request.setTagsList(tagsList); - if (attributesList != null) { - attributesList = - attributesList.stream() - .map(value -> value != null ? value : new HashMap()) - .collect(Collectors.toList()); - } - request.setAttributesList(attributesList); - if (measurementAliasList != null) { - measurementAliasList = - measurementAliasList.stream() - .map(value -> value != null ? value : "") - .collect(Collectors.toList()); - } - request.setMeasurementAliasList(measurementAliasList); + request.setTagsList(replaceNullMaps(tagsList)); + request.setAttributesList(replaceNullMaps(attributesList)); + request.setMeasurementAliasList(replaceNullStrings(measurementAliasList)); return request; } @@ -1819,19 +1767,23 @@ private boolean filterNullValueAndMeasurement( List measurementsList, List types, List valuesList) { - Map nullMap = new HashMap<>(); + Map nullMap = logger.isInfoEnabled() ? new HashMap<>() : null; for (int i = valuesList.size() - 1; i >= 0; i--) { if (valuesList.get(i) == null) { - nullMap.put(measurementsList.get(i), valuesList.get(i)); + if (nullMap != null) { + nullMap.put(measurementsList.get(i), valuesList.get(i)); + } valuesList.remove(i); measurementsList.remove(i); types.remove(i); } } if (valuesList.isEmpty()) { - logger.info("All values of the {} are null,null values are {}", deviceId, nullMap); + if (nullMap != null) { + logger.info("All values of the {} are null,null values are {}", deviceId, nullMap); + } return true; - } else { + } else if (nullMap != null) { logger.info("Some values of {} are null,null values are {}", deviceId, nullMap); } return false; @@ -1867,18 +1819,22 @@ private void filterNullValueAndMeasurementWithStringType( */ private boolean filterNullValueAndMeasurementWithStringType( List valuesList, String deviceId, List measurementsList) { - Map nullMap = new HashMap<>(); + Map nullMap = logger.isInfoEnabled() ? new HashMap<>() : null; for (int i = valuesList.size() - 1; i >= 0; i--) { if (valuesList.get(i) == null) { - nullMap.put(measurementsList.get(i), valuesList.get(i)); + if (nullMap != null) { + nullMap.put(measurementsList.get(i), valuesList.get(i)); + } valuesList.remove(i); measurementsList.remove(i); } } if (valuesList.isEmpty()) { - logger.info("All values of the {} are null,null values are {}", deviceId, nullMap); + if (nullMap != null) { + logger.info("All values of the {} are null,null values are {}", deviceId, nullMap); + } return true; - } else { + } else if (nullMap != null) { logger.info("Some values of {} are null,null values are {}", deviceId, nullMap); } return false; @@ -2622,7 +2578,65 @@ private TSInsertStringRecordsOfOneDeviceReq genTSInsertStringRecordsOfOneDeviceR * @return ordered list */ private static List sortList(List source, Integer[] index) { - return Arrays.stream(index).map(source::get).collect(Collectors.toList()); + List sortedList = new ArrayList<>(index.length); + for (int position : index) { + sortedList.add(source.get(position)); + } + return sortedList; + } + + private static List toDataTypeOrdinals(List dataTypes) { + List ordinals = new ArrayList<>(dataTypes.size()); + for (TSDataType dataType : dataTypes) { + ordinals.add(dataType.ordinal()); + } + return ordinals; + } + + private static List toEncodingOrdinals(List encodings) { + List ordinals = new ArrayList<>(encodings.size()); + for (TSEncoding encoding : encodings) { + ordinals.add(encoding.ordinal()); + } + return ordinals; + } + + private static List toCompressionOrdinals(List compressors) { + List ordinals = new ArrayList<>(compressors.size()); + for (CompressionType compression : compressors) { + ordinals.add((int) compression.serialize()); + } + return ordinals; + } + + private static List toEnumOrdinalsAsBytes(List> enumValues) { + List ordinals = new ArrayList<>(enumValues.size()); + for (Enum enumValue : enumValues) { + ordinals.add((byte) enumValue.ordinal()); + } + return ordinals; + } + + private static List replaceNullStrings(List values) { + if (values == null) { + return null; + } + List replacedValues = new ArrayList<>(values.size()); + for (String value : values) { + replacedValues.add(value != null ? value : ""); + } + return replacedValues; + } + + private static List> replaceNullMaps(List> values) { + if (values == null) { + return null; + } + List> replacedValues = new ArrayList<>(values.size()); + for (Map value : values) { + replacedValues.add(value != null ? value : new HashMap<>()); + } + return replacedValues; } private List objectValuesListToByteBufferList( @@ -2820,10 +2834,7 @@ public void insertRelationalTablet(Tablet tablet) } else { TSInsertTabletReq request = genTSInsertTabletReq(tablet, false, false); request.setWriteToTable(true); - request.setColumnCategories( - tablet.getColumnTypes().stream() - .map(t -> (byte) t.ordinal()) - .collect(Collectors.toList())); + request.setColumnCategories(toEnumOrdinalsAsBytes(tablet.getColumnTypes())); try { getDefaultSessionConnection().insertTablet(request); } catch (RedirectException ignored) { @@ -2889,8 +2900,7 @@ private void insertRelationalTabletOnce(Map relationa Tablet tablet = entry.getValue(); TSInsertTabletReq request = genTSInsertTabletReq(tablet, false, false); request.setWriteToTable(true); - request.setColumnCategories( - tablet.getColumnTypes().stream().map(t -> (byte) t.ordinal()).collect(Collectors.toList())); + request.setColumnCategories(toEnumOrdinalsAsBytes(tablet.getColumnTypes())); try { connection.insertTablet(request); } catch (RedirectException e) { @@ -2932,9 +2942,7 @@ private void insertRelationalTabletByGroup(Map relati TSInsertTabletReq request = genTSInsertTabletReq(subTablet, false, false); request.setWriteToTable(true); request.setColumnCategories( - subTablet.getColumnTypes().stream() - .map(t -> (byte) t.ordinal()) - .collect(Collectors.toList())); + toEnumOrdinalsAsBytes(subTablet.getColumnTypes())); InsertConsumer insertConsumer = SessionConnection::insertTablet; try { @@ -3904,10 +3912,9 @@ public void addAlignedMeasurementsInTemplate( TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq(); req.setName(templateName); req.setMeasurements(measurementsPath); - req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList())); - req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList())); - req.setCompressors( - compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList())); + req.setDataTypes(toDataTypeOrdinals(dataTypes)); + req.setEncodings(toEncodingOrdinals(encodings)); + req.setCompressors(toCompressionOrdinals(compressors)); req.setIsAligned(true); getDefaultSessionConnection().appendSchemaTemplate(req); } @@ -3950,10 +3957,9 @@ public void addUnalignedMeasurementsInTemplate( TSAppendSchemaTemplateReq req = new TSAppendSchemaTemplateReq(); req.setName(templateName); req.setMeasurements(measurementsPath); - req.setDataTypes(dataTypes.stream().map(TSDataType::ordinal).collect(Collectors.toList())); - req.setEncodings(encodings.stream().map(TSEncoding::ordinal).collect(Collectors.toList())); - req.setCompressors( - compressors.stream().map(i -> (int) i.serialize()).collect(Collectors.toList())); + req.setDataTypes(toDataTypeOrdinals(dataTypes)); + req.setEncodings(toEncodingOrdinals(encodings)); + req.setCompressors(toCompressionOrdinals(compressors)); req.setIsAligned(false); getDefaultSessionConnection().appendSchemaTemplate(req); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index eed26dadc295..c667e5f51703 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -482,7 +482,7 @@ void invalidateLastCache(final PartialPath devicePath, final String measurement) }, cachedDeviceID -> { try { - return new PartialPath(cachedDeviceID).matchFullPath(devicePath); + return devicePath.matchFullPath(cachedDeviceID); } catch (final IllegalPathException e) { logger.warn( "Illegal deviceID {} found in cache when invalidating by path {}, invalidate it anyway", @@ -521,8 +521,8 @@ void invalidateCache( cachedDeviceID -> { try { return isMultiLevelWildcardMeasurement - ? devicePath.matchPrefixPath(new PartialPath(cachedDeviceID)) - : devicePath.matchFullPath(new PartialPath(cachedDeviceID)); + ? devicePath.matchPrefixPath(cachedDeviceID) + : devicePath.matchFullPath(cachedDeviceID); } catch (final IllegalPathException e) { logger.warn( "Illegal deviceID {} found in cache when invalidating by path {}, invalidate it anyway", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index ede0ba13a851..6ffec5d89c32 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -368,11 +368,13 @@ public int getSchemaRegionNumber() { public Map countDeviceNumBySchemaRegion(final List schemaIds) { final Map deviceNum = new HashMap<>(); + final java.util.Collection targetSchemaIds = + schemaIds.size() > 1 ? new java.util.HashSet<>(schemaIds) : schemaIds; schemaRegionMap.entrySet().stream() .filter( entry -> - schemaIds.contains(entry.getKey().getId()) + targetSchemaIds.contains(entry.getKey().getId()) && SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey())) .forEach( entry -> @@ -384,10 +386,12 @@ public Map countDeviceNumBySchemaRegion(final List schem public Map countTimeSeriesNumBySchemaRegion(final List schemaIds) { final Map timeSeriesNum = new HashMap<>(); + final java.util.Collection targetSchemaIds = + schemaIds.size() > 1 ? new java.util.HashSet<>(schemaIds) : schemaIds; schemaRegionMap.entrySet().stream() .filter( entry -> - schemaIds.contains(entry.getKey().getId()) + targetSchemaIds.contains(entry.getKey().getId()) && SchemaRegionConsensusImpl.getInstance().isLeader(entry.getKey()) && !entry .getValue() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index ff294b36ed35..fbf984de1755 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -1086,12 +1086,14 @@ private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String pool public void getDiskSizeByDataRegion( Map dataRegionDisk, List dataRegionIds) { - dataRegionMap.forEach( - (dataRegionId, dataRegion) -> { - if (dataRegionIds.contains(dataRegionId.getId())) { - dataRegionDisk.put(dataRegionId.getId(), dataRegion.countRegionDiskSize()); - } - }); + final java.util.Collection targetDataRegionIds = + dataRegionIds.size() > 1 ? new java.util.HashSet<>(dataRegionIds) : dataRegionIds; + for (Integer dataRegionId : targetDataRegionIds) { + final DataRegion dataRegion = dataRegionMap.get(new DataRegionId(dataRegionId)); + if (dataRegion != null) { + dataRegionDisk.put(dataRegionId, dataRegion.countRegionDiskSize()); + } + } } public static File getDataRegionSystemDir(String dataBaseName, String dataRegionId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java index 6ebb6f7073bb..f7878bad98ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java @@ -87,10 +87,18 @@ public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp) curFileModEntries != null ? curFileModEntries : queryContext.loadAllModificationsFromDisk(tsFileResource); - List modifications = queryContext.getPathModifications(curFileModEntries, deviceID); List timeRangeList = - modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList()); - return ModificationUtils.isPointDeletedWithoutOrderedRange(timestamp, timeRangeList); + getMergedTimeRanges(queryContext.getPathModifications(curFileModEntries, deviceID)); + return ModificationUtils.isPointDeleted(timestamp, timeRangeList); + } + + private static List getMergedTimeRanges(List modifications) { + List timeRangeList = new ArrayList<>(modifications.size()); + for (ModEntry modification : modifications) { + timeRangeList.add(modification.getTimeRange()); + } + TimeRange.sortAndMerge(timeRangeList); + return timeRangeList; } @Override @@ -107,9 +115,7 @@ public boolean isTimeSeriesTimeDeleted(IDeviceID deviceID, String timeSeriesName List modifications = queryContext.getPathModifications(curFileModEntries, deviceID, timeSeriesName); - List timeRangeList = - modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList()); - TimeRange.sortAndMerge(timeRangeList); + List timeRangeList = getMergedTimeRanges(modifications); deviceToModifications .computeIfAbsent(deviceID, k -> new HashMap<>()) .put(timeSeriesName, timeRangeList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java index ba03f69a11a5..e1379f46a057 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -39,6 +40,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -48,6 +51,8 @@ public class UnclosedFileScanHandleImpl implements IFileScanHandle { private final TsFileResource tsFileResource; private final Map>> deviceToChunkMetadataMap; private final Map>> deviceToMemChunkHandleMap; + private final Map> deviceToDeletionRanges; + private final Map>> deviceToTimeSeriesDeletionRanges; public UnclosedFileScanHandleImpl( Map>> deviceToChunkMetadataMap, @@ -56,6 +61,8 @@ public UnclosedFileScanHandleImpl( this.deviceToChunkMetadataMap = deviceToChunkMetadataMap; this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap; this.tsFileResource = tsFileResource; + this.deviceToDeletionRanges = new HashMap<>(); + this.deviceToTimeSeriesDeletionRanges = new HashMap<>(); } @Override @@ -68,19 +75,12 @@ public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws I @Override public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timeArray) { - Map> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); - for (List chunkMetadataList : chunkMetadataMap.values()) { - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - if (chunkMetadata.getDeleteIntervalList() != null) { - for (TimeRange deleteInterval : chunkMetadata.getDeleteIntervalList()) { - if (deleteInterval.contains(timeArray)) { - return true; - } - } - } - } + List deletionRanges = deviceToDeletionRanges.get(deviceID); + if (deletionRanges == null) { + deletionRanges = collectDeviceDeletionRanges(deviceID); + deviceToDeletionRanges.put(deviceID, deletionRanges); } - return false; + return ModificationUtils.isPointDeleted(timeArray, deletionRanges); } @Override @@ -121,19 +121,14 @@ public Iterator getAllDeviceChunkMetaData() throws @Override public boolean isTimeSeriesTimeDeleted( IDeviceID deviceID, String timeSeriesName, long timestamp) { - List chunkMetadataList = - deviceToChunkMetadataMap.get(deviceID).get(timeSeriesName); - // check if timestamp is deleted by deleteInterval - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - if (chunkMetadata.getDeleteIntervalList() != null) { - for (TimeRange deleteInterval : chunkMetadata.getDeleteIntervalList()) { - if (deleteInterval.contains(timestamp)) { - return true; - } - } - } + Map> timeSeriesDeletionRanges = + deviceToTimeSeriesDeletionRanges.computeIfAbsent(deviceID, key -> new HashMap<>()); + List deletionRanges = timeSeriesDeletionRanges.get(timeSeriesName); + if (deletionRanges == null) { + deletionRanges = collectTimeSeriesDeletionRanges(deviceID, timeSeriesName); + timeSeriesDeletionRanges.put(timeSeriesName, deletionRanges); } - return false; + return ModificationUtils.isPointDeleted(timestamp, deletionRanges); } @Override @@ -167,4 +162,42 @@ public boolean isDeleted() { public TsFileResource getTsResource() { return tsFileResource; } + + private List collectDeviceDeletionRanges(IDeviceID deviceID) { + Map> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); + if (chunkMetadataMap == null || chunkMetadataMap.isEmpty()) { + return Collections.emptyList(); + } + List deletionRanges = new ArrayList<>(); + for (List chunkMetadataList : chunkMetadataMap.values()) { + appendDeletionRanges(deletionRanges, chunkMetadataList); + } + TimeRange.sortAndMerge(deletionRanges); + return deletionRanges; + } + + private List collectTimeSeriesDeletionRanges( + IDeviceID deviceID, String timeSeriesName) { + Map> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); + if (chunkMetadataMap == null) { + return Collections.emptyList(); + } + List chunkMetadataList = chunkMetadataMap.get(timeSeriesName); + if (chunkMetadataList == null || chunkMetadataList.isEmpty()) { + return Collections.emptyList(); + } + List deletionRanges = new ArrayList<>(); + appendDeletionRanges(deletionRanges, chunkMetadataList); + TimeRange.sortAndMerge(deletionRanges); + return deletionRanges; + } + + private void appendDeletionRanges( + List deletionRanges, List chunkMetadataList) { + for (IChunkMetadata chunkMetadata : chunkMetadataList) { + if (chunkMetadata.getDeleteIntervalList() != null) { + deletionRanges.addAll(chunkMetadata.getDeleteIntervalList()); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index 86890370c313..b2cb78860109 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -206,14 +206,7 @@ public Iterator getIterator(boolean sequence) { public void remove(TsFileResource tsFileResource, boolean sequence) { writeLock("remove"); try { - Map selectedMap = sequence ? sequenceFiles : unsequenceFiles; - for (Map.Entry entry : selectedMap.entrySet()) { - if (entry.getValue().contains(tsFileResource)) { - entry.getValue().remove(tsFileResource); - TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource); - break; - } - } + removeFromPartitionFileList(tsFileResource, sequence); } finally { writeUnlock(); } @@ -223,10 +216,18 @@ public void removeAll(List tsFileResourceList, boolean sequence) writeLock("removeAll"); try { for (TsFileResource resource : tsFileResourceList) { - remove(resource, sequence); + removeFromPartitionFileList(resource, sequence); } } finally { - writeLock("removeAll"); + writeUnlock(); + } + } + + private void removeFromPartitionFileList(TsFileResource tsFileResource, boolean sequence) { + Map selectedMap = sequence ? sequenceFiles : unsequenceFiles; + TsFileResourceList tsFileResources = selectedMap.get(tsFileResource.getTimePartition()); + if (tsFileResources != null && tsFileResources.remove(tsFileResource)) { + TsFileResourceManager.getInstance().removeTsFileResource(tsFileResource); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java index caca8e9fdba4..f03d57c8344b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java @@ -467,7 +467,7 @@ public Pair getPossibleStartTimeAndEndTime( endTime = endTimes[entry.getValue()]; } } else { - if (devicePattern.matchFullPath(new PartialPath(entry.getKey()))) { + if (devicePattern.matchFullPath(entry.getKey())) { deviceMatchInfo.add(entry.getKey()); hasMatchedDevice = true; if (startTimes[entry.getValue()] < startTime) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java index 351e64650a7d..142e5b9d1884 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java @@ -81,7 +81,11 @@ public static void modifyChunkMetaData( if (range.contains(metaData.getStartTime(), metaData.getEndTime())) { return true; } else { - if (range.overlaps(new TimeRange(metaData.getStartTime(), metaData.getEndTime()))) { + if (overlap( + metaData.getStartTime(), + metaData.getEndTime(), + range.getMin(), + range.getMax())) { metaData.setModified(true); } } @@ -143,9 +147,11 @@ private static boolean areAllValueColumnsDeleted( currentRemoved = true; break; } else { - if (range.overlaps( - new TimeRange( - valueChunkMetadata.getStartTime(), valueChunkMetadata.getEndTime()))) { + if (overlap( + valueChunkMetadata.getStartTime(), + valueChunkMetadata.getEndTime(), + range.getMin(), + range.getMax())) { valueChunkMetadata.setModified(true); modified = true; } @@ -193,10 +199,11 @@ public static void modifyAlignedChunkMetaData( // all rows are deleted return true; } else { - if (range.overlaps( - new TimeRange( - timeColumnChunkMetadata.getStartTime(), - timeColumnChunkMetadata.getEndTime()))) { + if (overlap( + timeColumnChunkMetadata.getStartTime(), + timeColumnChunkMetadata.getEndTime(), + range.getMin(), + range.getMax())) { timeColumnChunkMetadata.setModified(true); modified = true; } @@ -328,16 +335,17 @@ public static List> constructDeletionList( if (measurementList.isEmpty()) { return Collections.emptyList(); } - List modifications = - ModificationUtils.getModificationsForMemtable(memTable, modsToMemtable); - List> deletionList = new ArrayList<>(); + List deviceModifications = + filterDeviceModifications( + deviceID, + ModificationUtils.getModificationsForMemtable(memTable, modsToMemtable), + timeLowerBound); + List> deletionList = new ArrayList<>(measurementList.size()); for (String measurement : measurementList) { List columnDeletionList = new ArrayList<>(); columnDeletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); - for (ModEntry modification : modifications) { - if (modification.affects(deviceID) - && modification.affects(measurement) - && modification.getEndTime() > timeLowerBound) { + for (ModEntry modification : deviceModifications) { + if (modification.affects(measurement)) { long lowerBound = Math.max(modification.getStartTime(), timeLowerBound); columnDeletionList.add(new TimeRange(lowerBound, modification.getEndTime())); } @@ -361,10 +369,10 @@ public static List constructDeletionList( long timeLowerBound) { List deletionList = new ArrayList<>(); deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound)); - for (ModEntry modification : getModificationsForMemtable(memTable, modsToMemtable)) { - if (modification.affects(deviceID) - && modification.affects(measurement) - && modification.getEndTime() > timeLowerBound) { + for (ModEntry modification : + filterDeviceModifications( + deviceID, getModificationsForMemtable(memTable, modsToMemtable), timeLowerBound)) { + if (modification.affects(measurement)) { long lowerBound = Math.max(modification.getStartTime(), timeLowerBound); deletionList.add(new TimeRange(lowerBound, modification.getEndTime())); } @@ -385,6 +393,17 @@ private static List getModificationsForMemtable( return modifications; } + private static List filterDeviceModifications( + IDeviceID deviceID, List modifications, long timeLowerBound) { + List deviceModifications = new ArrayList<>(); + for (ModEntry modification : modifications) { + if (modification.affects(deviceID) && modification.getEndTime() > timeLowerBound) { + deviceModifications.add(modification); + } + } + return deviceModifications; + } + public static boolean canMerge(TimeRange left, TimeRange right) { // [1,3] can merge with [4, 5] // [1,3] cannot merge with [5,6] diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 6220623dd3b2..5bfb18c52cc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -32,15 +32,14 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.write.UnSupportedDataTypeException; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Optional; @@ -65,9 +64,9 @@ public static Pair convertTsBlockByFetchSize( // indicate whether it is a null int columnNumWithTime = columnNum * 2 + 1; DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; - ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime]; for (int i = 0; i < columnNumWithTime; i++) { - byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + byteArrayOutputStreams[i] = new PublicBAOS(); dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); } @@ -117,9 +116,9 @@ public static TSQueryDataSet convertTsBlockByFetchSize(List tsBlocks) int columnNum = 1; int columnNumWithTime = columnNum * 2 + 1; DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; - ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + PublicBAOS[] byteArrayOutputStreams = new PublicBAOS[columnNumWithTime]; for (int i = 0; i < columnNumWithTime; i++) { - byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + byteArrayOutputStreams[i] = new PublicBAOS(); dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); } @@ -277,27 +276,14 @@ public static TSQueryDataSet convertTsBlockByFetchSize(List tsBlocks) } // calculate the time buffer size - int timeOccupation = rowCount * 8; - ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); - timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); - timeBuffer.flip(); - tsQueryDataSet.setTime(timeBuffer); + tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0])); // calculate the bitmap buffer size - int bitmapOccupation = (rowCount + 7) / 8; - - List bitmapList = new LinkedList<>(); - List valueList = new LinkedList<>(); + List bitmapList = new ArrayList<>(columnNum); + List valueList = new ArrayList<>(columnNum); for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { - ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); - valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); - valueBuffer.flip(); - valueList.add(valueBuffer); - - ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); - bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); - bitmapBuffer.flip(); - bitmapList.add(bitmapBuffer); + valueList.add(wrapBuffer(byteArrayOutputStreams[i])); + bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1])); } tsQueryDataSet.setBitmapList(bitmapList); tsQueryDataSet.setValueList(valueList); @@ -572,40 +558,30 @@ private static void fillRemainingBitMap( } private static void fillTimeColumn( - int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) { - // calculate the time buffer size - int timeOccupation = rowCount * 8; - ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); - timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); - timeBuffer.flip(); - tsQueryDataSet.setTime(timeBuffer); + int rowCount, PublicBAOS[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) { + tsQueryDataSet.setTime(wrapBuffer(byteArrayOutputStreams[0])); } private static void fillValueColumnsAndBitMaps( int rowCount, - ByteArrayOutputStream[] byteArrayOutputStreams, + PublicBAOS[] byteArrayOutputStreams, int[] valueOccupation, TSQueryDataSet tsQueryDataSet) { - // calculate the bitmap buffer size - int bitmapOccupation = (rowCount + 7) / 8; - - List bitmapList = new LinkedList<>(); - List valueList = new LinkedList<>(); + int columnNum = valueOccupation.length; + List bitmapList = new ArrayList<>(columnNum); + List valueList = new ArrayList<>(columnNum); for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { - ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); - valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); - valueBuffer.flip(); - valueList.add(valueBuffer); - - ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); - bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); - bitmapBuffer.flip(); - bitmapList.add(bitmapBuffer); + valueList.add(wrapBuffer(byteArrayOutputStreams[i])); + bitmapList.add(wrapBuffer(byteArrayOutputStreams[i + 1])); } tsQueryDataSet.setBitmapList(bitmapList); tsQueryDataSet.setValueList(valueList); } + private static ByteBuffer wrapBuffer(PublicBAOS outputStream) { + return ByteBuffer.wrap(outputStream.getBuf(), 0, outputStream.size()); + } + /** * To fetch required amounts of data and combine them through List * @@ -668,9 +644,7 @@ public static Optional readBitMapsFromBuffer(ByteBuffer buffer, int co boolean hasBitMap = BytesUtils.byteToBool(buffer.get()); if (hasBitMap) { byte[] bytes = new byte[BitMap.getSizeOfBytes(size)]; - for (int j = 0; j < bytes.length; j++) { - bytes[j] = buffer.get(); - } + buffer.get(bytes); bitMaps[i] = new BitMap(size, bytes); } } @@ -687,9 +661,7 @@ public static Optional readBitMapsFromStream( boolean hasBitMap = BytesUtils.byteToBool(stream.readByte()); if (hasBitMap) { byte[] bytes = new byte[BitMap.getSizeOfBytes(size)]; - for (int j = 0; j < bytes.length; j++) { - bytes[j] = stream.readByte(); - } + stream.readFully(bytes); bitMaps[i] = new BitMap(size, bytes); } } @@ -865,11 +837,7 @@ private static void parseTextColumn( for (int index = 0; index < size; index++) { int binarySize = stream.readInt(); byte[] binaryValue = new byte[binarySize]; - int actualReadSize = stream.read(binaryValue); - if (actualReadSize != binarySize) { - throw new IllegalStateException( - "Expect to read " + binarySize + " bytes, actually read " + actualReadSize + "bytes."); - } + stream.readFully(binaryValue); binaryValues[index] = new Binary(binaryValue); } values[columnIndex] = binaryValues; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index b0f2c3fe0519..902a14333d05 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -424,16 +424,19 @@ public boolean matchFullPath(PartialPath rPath) { return matchPath(rPath.getNodes(), 0, 0, false, false); } + public boolean matchFullPath(IDeviceID deviceID) throws IllegalPathException { + return matchPath(getDeviceNodes(deviceID), 0, 0, false, false); + } + public boolean matchFullPath(IDeviceID deviceID, String measurement) { - // TODO change this way - PartialPath devicePath; try { - devicePath = new PartialPath(deviceID.toString()); + String[] deviceNodes = getDeviceNodes(deviceID); + String[] fullPathNodes = Arrays.copyOf(deviceNodes, deviceNodes.length + 1); + fullPathNodes[deviceNodes.length] = measurement; + return matchPath(fullPathNodes, 0, 0, false, false); } catch (IllegalPathException e) { throw new RuntimeException(e); } - return matchPath( - devicePath.concatAsMeasurementPath(measurement).getNodes(), 0, 0, false, false); } /** @@ -470,6 +473,21 @@ public boolean matchPrefixPath(PartialPath prefixPath) { return matchPath(prefixPath.getNodes(), 0, 0, false, true); } + public boolean matchPrefixPath(IDeviceID deviceID) throws IllegalPathException { + return matchPath(getDeviceNodes(deviceID), 0, 0, false, true); + } + + private static String[] getDeviceNodes(IDeviceID deviceID) throws IllegalPathException { + String[] tableNameSegments = PathUtils.splitPathToDetachedNodes(deviceID.getTableName()); + String[] deviceNodes = new String[deviceID.segmentNum() - 1 + tableNameSegments.length]; + System.arraycopy(tableNameSegments, 0, deviceNodes, 0, tableNameSegments.length); + for (int i = 0; i < deviceID.segmentNum() - 1; i++) { + deviceNodes[i + tableNameSegments.length] = + deviceID.segment(i + 1) != null ? deviceID.segment(i + 1).toString() : null; + } + return deviceNodes; + } + private boolean matchPath( String[] pathNodes, int pathIndex,