diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java index 66d0e3a6a0de1..0b59c0f6cc6ee 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java @@ -248,14 +248,6 @@ public void testUnsupportedValueFilter() throws SQLException { assertEquals("701: The column 's1' does not exist or is not a tag column", e.getMessage()); } - try { - statement.execute("DELETE FROM vehicle1 WHERE attr1 = 'text'"); - fail("should not reach here!"); - } catch (SQLException e) { - assertEquals( - "701: The column 'attr1' does not exist or is not a tag column", e.getMessage()); - } - try { statement.execute("DELETE FROM vehicle1 WHERE s3 = 'text'"); fail("should not reach here!"); @@ -277,14 +269,7 @@ public void testUnsupportedValueFilter() throws SQLException { assertEquals("701: The operator of tag predicate must be '=' for 'd0'", e.getMessage()); } - try { - statement.execute("DELETE FROM vehicle1 WHERE time < 10 and deviceId is not null"); - fail("should not reach here!"); - } catch (SQLException e) { - assertEquals( - "701: Unsupported expression: (deviceId IS NOT NULL) in ((time < 10) AND (deviceId IS NOT NULL))", - e.getMessage()); - } + statement.execute("DELETE FROM vehicle1 WHERE time < 10 and deviceId is not null"); try { statement.execute("DELETE FROM vehicle1 WHERE time < 10 and deviceId = null"); @@ -335,6 +320,345 @@ public void testUnsupportedValueFilter() throws SQLException { } } + @Test + public void testDeleteDataByAttributeFilter() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr(deviceId STRING TAG, attr1 ATTRIBUTE, attr2 ATTRIBUTE, s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_attr(time, deviceId, attr1, attr2, s1) VALUES " + + "(1, 'd1', 'red', 'small', 1)," + + "(2, 'd1', 'red', 'small', 2)," + + "(1, 'd2', 'red', 'large', 3)," + + "(2, 'd2', 'red', 'large', 4)," + + "(1, 'd3', 'blue', 'small', 5)," + + "(2, 'd3', 'blue', 'small', 6)"); + + statement.execute("DELETE FROM delete_by_attr WHERE attr1 = 'red' AND attr2 = 'small'"); + + try (ResultSet resultSet = + statement.executeQuery("SELECT deviceId, s1 FROM delete_by_attr ORDER BY deviceId, s1")) { + assertTrue(resultSet.next()); + assertEquals("d2", resultSet.getString(1)); + assertEquals(3, resultSet.getInt(2)); + assertTrue(resultSet.next()); + assertEquals("d2", resultSet.getString(1)); + assertEquals(4, resultSet.getInt(2)); + assertTrue(resultSet.next()); + assertEquals("d3", resultSet.getString(1)); + assertEquals(5, resultSet.getInt(2)); + assertTrue(resultSet.next()); + assertEquals("d3", resultSet.getString(1)); + assertEquals(6, resultSet.getInt(2)); + assertFalse(resultSet.next()); + } + } + } + + @Test + public void testDeleteDataByAttributeFilterWithTagAndTimeRange() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr_tag_time(" + + "deviceId STRING TAG, site STRING TAG, attr1 ATTRIBUTE, attr2 ATTRIBUTE, " + + "s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_attr_tag_time(time, deviceId, site, attr1, attr2, s1) VALUES " + + "(1, 'd1', 'north', 'red', 'small', 11)," + + "(2, 'd1', 'north', 'red', 'small', 12)," + + "(3, 'd1', 'north', 'red', 'small', 13)," + + "(4, 'd1', 'north', 'red', 'small', 14)," + + "(1, 'd2', 'south', 'red', 'small', 21)," + + "(2, 'd2', 'south', 'red', 'small', 22)," + + "(2, 'd3', 'north', 'blue', 'small', 32)," + + "(3, 'd4', 'north', 'red', 'large', 43)"); + + statement.execute( + "DELETE FROM delete_by_attr_tag_time " + + "WHERE time >= 2 AND time <= 3 " + + "AND site = 'north' AND attr1 = 'red' AND attr2 = 'small'"); + + final List actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_tag_time ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals( + List.of("d1,1,11", "d1,4,14", "d2,1,21", "d2,2,22", "d3,2,32", "d4,3,43"), actual); + } + } + + @Test + public void testDeleteDataByAttributeFilterWithOrAndNull() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr_or_null(" + + "deviceId STRING TAG, site STRING TAG, attr1 ATTRIBUTE, attr2 ATTRIBUTE, " + + "s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_attr_or_null(time, deviceId, site, attr1, attr2, s1) VALUES " + + "(1, 'd1', 'north', 'red', 'small', 11)," + + "(2, 'd1', 'north', 'red', 'small', 12)," + + "(1, 'd2', 'north', 'blue', 'small', 21)," + + "(2, 'd2', 'north', 'blue', 'small', 22)," + + "(1, 'd3', 'south', 'red', 'large', 31)," + + "(2, 'd3', 'south', 'red', 'large', 32)"); + statement.execute( + "INSERT INTO delete_by_attr_or_null(time, deviceId, site, attr1, s1) VALUES " + + "(1, 'd4', 'north', 'red', 41)," + + "(2, 'd4', 'north', 'red', 42)"); + + statement.execute( + "DELETE FROM delete_by_attr_or_null " + + "WHERE (attr1 = 'red' AND site = 'north' AND time = 1) " + + "OR (attr2 IS NULL AND time = 2)"); + + final List actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_or_null ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals(List.of("d1,2,12", "d2,1,21", "d2,2,22", "d3,1,31", "d3,2,32"), actual); + } + } + + @Test + public void testDeleteDataByAttributeFilterWithOtherOperators() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr_other_ops(" + + "deviceId STRING TAG, attr1 ATTRIBUTE, attr2 ATTRIBUTE, s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_attr_other_ops(time, deviceId, attr1, attr2, s1) VALUES " + + "(1, 'd1', 'a', 'x', 11)," + + "(2, 'd1', 'a', 'x', 12)," + + "(1, 'd2', 'b', 'y', 21)," + + "(2, 'd2', 'b', 'y', 22)," + + "(1, 'd3', 'c', 'z', 31)," + + "(2, 'd3', 'c', 'z', 32)"); + statement.execute( + "INSERT INTO delete_by_attr_other_ops(time, deviceId, attr2, s1) VALUES " + + "(1, 'd4', 'null_attr1', 41)," + + "(2, 'd4', 'null_attr1', 42)"); + + statement.execute("DELETE FROM delete_by_attr_other_ops WHERE attr1 > 'b' AND time = 1"); + + List actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_other_ops ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals( + List.of("d1,1,11", "d1,2,12", "d2,1,21", "d2,2,22", "d3,2,32", "d4,1,41", "d4,2,42"), + actual); + + statement.execute("DELETE FROM delete_by_attr_other_ops WHERE attr1 != 'a' AND time = 2"); + + actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_other_ops ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals(List.of("d1,1,11", "d1,2,12", "d2,1,21", "d4,1,41", "d4,2,42"), actual); + + statement.execute("DELETE FROM delete_by_attr_other_ops WHERE attr1 <= 'a' AND time = 1"); + + actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_other_ops ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals(List.of("d1,2,12", "d2,1,21", "d4,1,41", "d4,2,42"), actual); + + statement.execute("DELETE FROM delete_by_attr_other_ops WHERE attr1 IS NOT NULL"); + + actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, time, s1 FROM delete_by_attr_other_ops ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + "," + resultSet.getLong(2) + "," + resultSet.getInt(3)); + } + } + assertEquals(List.of("d4,1,41", "d4,2,42"), actual); + } + } + + @Test + public void testDeleteDataByAttributeFilterWithInvalidComparison() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr_invalid_ops(" + + "deviceId STRING TAG, attr1 ATTRIBUTE, s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_attr_invalid_ops(time, deviceId, attr1, s1) VALUES " + + "(1, 'd1', 'a', 11)," + + "(1, 'd2', 'b', 21)"); + + try { + statement.execute("DELETE FROM delete_by_attr_invalid_ops WHERE attr1 = 1"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals( + "701: The right hand value of attribute predicate must be a string: 1", e.getMessage()); + } + + try { + statement.execute("DELETE FROM delete_by_attr_invalid_ops WHERE attr1 = null"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals( + "701: The right hand value of attribute predicate cannot be null with comparison operator, " + + "please use IS NULL or IS NOT NULL instead", + e.getMessage()); + } + + try { + statement.execute( + "DELETE FROM delete_by_attr_invalid_ops WHERE attr1 IS DISTINCT FROM 'a'"); + fail("should not reach here!"); + } catch (SQLException e) { + assertEquals( + "701: The operator of attribute predicate must be =, !=, <, <=, >, or >= for 'a'", + e.getMessage()); + } + } + } + + @Test + public void testDeleteDataByTagIsNotNullFilter() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_tag_not_null(" + + "deviceId STRING TAG, site STRING TAG, attr1 ATTRIBUTE, s1 INT32 FIELD)"); + statement.execute( + "INSERT INTO delete_by_tag_not_null(time, deviceId, site, attr1, s1) VALUES " + + "(1, 'd1', 'north', 'red', 11)," + + "(2, 'd1', 'north', 'red', 12)," + + "(1, 'd2', 'south', 'blue', 21)," + + "(2, 'd2', 'south', 'blue', 22)"); + statement.execute( + "INSERT INTO delete_by_tag_not_null(time, deviceId, attr1, s1) VALUES " + + "(1, 'd3', 'green', 31)," + + "(2, 'd3', 'green', 32)," + + "(1, 'd4', 'red', 41)," + + "(2, 'd4', 'red', 42)"); + + statement.execute("DELETE FROM delete_by_tag_not_null WHERE site IS NOT NULL AND time = 1"); + + final List actual = new ArrayList<>(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, site, time, s1 FROM delete_by_tag_not_null " + + "ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + + "," + + resultSet.getString(2) + + "," + + resultSet.getLong(3) + + "," + + resultSet.getInt(4)); + } + } + assertEquals( + List.of( + "d1,north,2,12", + "d2,south,2,22", + "d3,null,1,31", + "d3,null,2,32", + "d4,null,1,41", + "d4,null,2,42"), + actual); + + statement.execute( + "DELETE FROM delete_by_tag_not_null WHERE site IS NOT NULL AND attr1='red'"); + + actual.clear(); + try (ResultSet resultSet = + statement.executeQuery( + "SELECT deviceId, site, time, s1 FROM delete_by_tag_not_null " + + "ORDER BY deviceId, time")) { + while (resultSet.next()) { + actual.add( + resultSet.getString(1) + + "," + + resultSet.getString(2) + + "," + + resultSet.getLong(3) + + "," + + resultSet.getInt(4)); + } + } + assertEquals( + List.of("d2,south,2,22", "d3,null,1,31", "d3,null,2,32", "d4,null,1,41", "d4,null,2,42"), + actual); + } + } + + @Test + public void testDeleteDataByAttributeFilterRejectsTooManyDevices() throws SQLException { + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("use test"); + statement.execute( + "CREATE TABLE delete_by_attr_limit(deviceId STRING TAG, attr1 ATTRIBUTE, s1 INT32 FIELD)"); + final StringBuilder builder = + new StringBuilder("INSERT INTO delete_by_attr_limit(time, deviceId, attr1, s1) VALUES "); + for (int i = 0; i <= 1000; i++) { + if (i > 0) { + builder.append(','); + } + builder.append("(1, 'd").append(i).append("', 'same', ").append(i).append(')'); + } + statement.execute(builder.toString()); + + try { + statement.execute("DELETE FROM delete_by_attr_limit WHERE attr1 = 'same'"); + fail("should not reach here!"); + } catch (SQLException e) { + assertTrue(e.getMessage().contains("Too many devices (1001)")); + assertTrue(e.getMessage().contains("limit is 1000")); + assertTrue(e.getMessage().contains("attr1")); + assertTrue(e.getMessage().contains("Please remove all attribute filters")); + } + } + } + @Test public void test() throws SQLException { int testId = 2; diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index f870290c09a82..8e70fe0a609bd 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -271,6 +271,19 @@ public final class DataNodeQueryMessages { "The table '%s' does not contain a time column"; public static final String THE_OPERATOR_OF_TAG_PREDICATE_MUST_BE_FOR = "The operator of tag predicate must be '=' for "; + public static final String THE_OPERATOR_OF_ATTRIBUTE_PREDICATE_MUST_BE_FOR = + "The operator of attribute predicate must be =, !=, <, <=, >, or >= for "; + public static final String + THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_CANNOT_BE_NULL_WITH_COMPARISON_OPERATOR = + "The right hand value of attribute predicate cannot be null with comparison operator, " + + "please use IS NULL or IS NOT NULL instead"; + public static final String THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_MUST_BE_A_STRING = + "The right hand value of attribute predicate must be a string: "; + public static final String THE_COLUMN_S_DOES_NOT_EXIST_OR_IS_NOT_A_TAG_COLUMN = + "The column '%s' does not exist or is not a tag column"; + public static final String TOO_MANY_DEVICES_MATCHED_BY_ATTRIBUTE_FILTERS_IN_DELETION = + "Too many devices (%d) matched by attribute filters in deletion, limit is %d. " + + "Please remove all attribute filters (%s) or add more attribute filters."; public static final String ONLY_TIME_FILTERS_ARE_SUPPORTED_IN_LAST_QUERY = "Only time filters are supported in LAST query"; public static final String VIEWS_CANNOT_BE_USED_IN_GROUP_BY_TAGS = diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java index cc708e7e48c30..a7a5bc6c9c35e 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java @@ -80,6 +80,8 @@ private StorageEngineMessages() {} public static final String INTERRUPTED_WAITING_THREAD_POOL_EXIT = "Interrupted while waiting {} thread pool to exit. "; public static final String BUFFERED_ARRAY_SIZE_THRESHOLD = "BufferedArraySizeThreshold is {}"; public static final String CURRENT_SG_COST = "Current Sg cost is {}"; + public static final String DIRECT_BUFFER_MEMORY_ALLOCATION_FAILED = + "Total allocated memory for direct buffer will be %d, which is greater than limit mem cost: %d"; public static final String FORCE_DEGRADE_TSFILE_RESOURCE = "Force degrade tsfile resource {}"; public static final String CANNOT_DEGRADE_TIME_INDEX_ALL_FILE_LEVEL = "Can't degrade time index any more because all time index are file level."; public static final String DEGRADE_TSFILE_RESOURCE = "Degrade tsfile resource {}"; diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index b0dae59e19861..52da7b214a90d 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -269,6 +269,17 @@ public final class DataNodeQueryMessages { "表 '%s' 不包含时间列"; public static final String THE_OPERATOR_OF_TAG_PREDICATE_MUST_BE_FOR = "标签谓词的运算符必须为 '=',目标:"; + public static final String THE_OPERATOR_OF_ATTRIBUTE_PREDICATE_MUST_BE_FOR = + "属性谓词的运算符必须为 =, !=, <, <=, > 或 >=,目标:"; + public static final String + THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_CANNOT_BE_NULL_WITH_COMPARISON_OPERATOR = + "属性谓词的右侧值不能在比较运算符中为 null,请改用 IS NULL 或 IS NOT NULL"; + public static final String THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_MUST_BE_A_STRING = + "属性谓词的右侧值必须为字符串:"; + public static final String THE_COLUMN_S_DOES_NOT_EXIST_OR_IS_NOT_A_TAG_COLUMN = + "列 '%s' 不存在或不是标签列"; + public static final String TOO_MANY_DEVICES_MATCHED_BY_ATTRIBUTE_FILTERS_IN_DELETION = + "删除语句中的属性过滤条件匹配了过多设备(%d),限制为 %d。请移除所有属性过滤条件(%s),或进一步增加属性过滤条件。"; public static final String ONLY_TIME_FILTERS_ARE_SUPPORTED_IN_LAST_QUERY = "LAST 查询中仅支持时间过滤器"; public static final String VIEWS_CANNOT_BE_USED_IN_GROUP_BY_TAGS = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java index 792909a97e056..4addaeeaa6c29 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java @@ -80,6 +80,8 @@ private StorageEngineMessages() {} public static final String INTERRUPTED_WAITING_THREAD_POOL_EXIT = "等待 {} 线程池退出时被中断。 "; public static final String BUFFERED_ARRAY_SIZE_THRESHOLD = "BufferedArraySizeThreshold 为 {}"; public static final String CURRENT_SG_COST = "当前存储组内存开销为 {}"; + public static final String DIRECT_BUFFER_MEMORY_ALLOCATION_FAILED = + "DirectBuffer 总分配内存将达到 %d,超过内存限制:%d"; public static final String FORCE_DEGRADE_TSFILE_RESOURCE = "强制降级 TsFile 资源 {}"; public static final String CANNOT_DEGRADE_TIME_INDEX_ALL_FILE_LEVEL = "无法继续降级时间索引,所有时间索引已为文件级别。"; public static final String DEGRADE_TSFILE_RESOURCE = "降级 TsFile 资源 {}"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DirectBufferMemoryAllocationException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DirectBufferMemoryAllocationException.java new file mode 100644 index 0000000000000..0e29e76b880bd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DirectBufferMemoryAllocationException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.exception; + +import org.apache.iotdb.db.i18n.StorageEngineMessages; + +public class DirectBufferMemoryAllocationException extends DataRegionException { + + private static final long serialVersionUID = -8268138833145785933L; + + public DirectBufferMemoryAllocationException( + long allocatedDirectBufferMemory, long directBufferMemoryLimit) { + super( + String.format( + StorageEngineMessages.DIRECT_BUFFER_MEMORY_ALLOCATION_FAILED, + allocatedDirectBufferMemory, + directBufferMemoryLimit)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index c588c94c7b5ab..42e69fd3ec72e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -212,8 +212,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler; import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; @@ -2148,7 +2148,7 @@ public TSStatus deleteColumnData(final TDeleteColumnDataReq req) { new TableDeletionEntry( new DeletionPredicate( req.getTableName(), - new IDPredicate.NOP(), + new TagPredicate.NOP(), Collections.singletonList(req.getColumnName())), new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE)), // the request is only sent to associated region diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java index 6d619914c09f8..b781d9bfdeea0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -29,13 +29,16 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.IsNotNullPredicate; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.IsNullPredicate; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LogicalExpression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LogicalExpression.Operator; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.NullLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.StringLiteral; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; @@ -45,6 +48,10 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionRewriter; +import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionTreeRewriter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TimeRange; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; @@ -55,10 +62,12 @@ import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.And; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.SegmentExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.And; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.DeviceIn; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentNotNull; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -82,6 +91,8 @@ public class AnalyzeUtils { + private static final int ATTRIBUTE_FILTER_DELETE_DEVICE_IN_LIMIT = 1000; + private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); private static final Logger LOGGER = LoggerFactory.getLogger(AnalyzeUtils.class); @@ -372,14 +383,20 @@ private static void validateSchema(final Delete node, final MPPQueryContext quer // Maybe set by pipe transfer if (Objects.isNull(node.getTableDeletionEntries())) { node.setTableDeletionEntries( - parseExpressions2ModEntries(node.getWhere().orElse(null), table)); + parseExpressions2ModEntries( + node.getWhere().orElse(null), table, databaseName, queryContext)); } } public static List parseExpressions2ModEntries( - final Expression expression, final TsTable table) { + final Expression expression, + final TsTable table, + final String databaseName, + final MPPQueryContext queryContext) { return toDisjunctiveNormalForms(expression).stream() - .map(disjunctiveNormalForm -> parsePredicate(disjunctiveNormalForm, table)) + .map( + disjunctiveNormalForm -> + parsePredicate(disjunctiveNormalForm, table, databaseName, queryContext)) .collect(Collectors.toList()); } @@ -447,7 +464,11 @@ private static List crossProductOfDisjunctiveNormalForms( return results; } - private static TableDeletionEntry parsePredicate(Expression expression, TsTable table) { + private static TableDeletionEntry parsePredicate( + Expression expression, + TsTable table, + final String databaseName, + final MPPQueryContext queryContext) { if (expression == null) { return new TableDeletionEntry( new DeletionPredicate(table.getTableName()), @@ -457,24 +478,102 @@ private static TableDeletionEntry parsePredicate(Expression expression, TsTable Queue expressionQueue = new LinkedList<>(); expressionQueue.add(expression); DeletionPredicate predicate = new DeletionPredicate(table.getTableName()); - IDPredicate tagPredicate = null; + TagPredicate tagPredicate = null; + List deviceFilterExpressions = null; + List attributeColumns = null; TimeRange timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE, true); while (!expressionQueue.isEmpty()) { Expression currExp = expressionQueue.remove(); if (currExp instanceof LogicalExpression) { parseAndPredicate(((LogicalExpression) currExp), expressionQueue); } else if (currExp instanceof ComparisonExpression) { - tagPredicate = + final PredicateParseResult parseResult = parseComparison(((ComparisonExpression) currExp), timeRange, tagPredicate, table); + tagPredicate = parseResult.tagPredicate; + if (parseResult.shouldQueryDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + if (Objects.isNull(attributeColumns)) { + attributeColumns = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + collectAttributeColumn(attributeColumns, parseResult.attributeColumn); + } else if (parseResult.shouldFilterDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + } } else if (currExp instanceof IsNullPredicate) { - tagPredicate = parseIsNull((IsNullPredicate) currExp, tagPredicate, table); + final PredicateParseResult parseResult = + parseIsNull((IsNullPredicate) currExp, tagPredicate, table); + tagPredicate = parseResult.tagPredicate; + if (parseResult.shouldQueryDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + if (Objects.isNull(attributeColumns)) { + attributeColumns = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + collectAttributeColumn(attributeColumns, parseResult.attributeColumn); + } else if (parseResult.shouldFilterDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + } + } else if (currExp instanceof IsNotNullPredicate) { + final PredicateParseResult parseResult = + parseIsNotNull((IsNotNullPredicate) currExp, tagPredicate, table); + tagPredicate = parseResult.tagPredicate; + if (parseResult.shouldQueryDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + if (Objects.isNull(attributeColumns)) { + attributeColumns = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + collectAttributeColumn(attributeColumns, parseResult.attributeColumn); + } else if (parseResult.shouldFilterDevice()) { + if (Objects.isNull(deviceFilterExpressions)) { + deviceFilterExpressions = new ArrayList<>(); + } + deviceFilterExpressions.add(toSymbolReferenceExpression(currExp)); + } } else { throw new SemanticException( DataNodeQueryMessages.UNSUPPORTED_EXPRESSION + currExp + " in " + expression); } } + if (Objects.nonNull(attributeColumns)) { + final Set deviceIDs = + TableDeviceSchemaFetcher.getInstance() + .fetchDeviceSchemaForDataQuery( + databaseName, + table.getTableName(), + deviceFilterExpressions, + attributeColumns, + queryContext) + .values() + .stream() + .flatMap(List::stream) + .map(DeviceEntry::getDeviceID) + .collect(Collectors.toSet()); + if (deviceIDs.size() > ATTRIBUTE_FILTER_DELETE_DEVICE_IN_LIMIT) { + throw new SemanticException( + String.format( + DataNodeQueryMessages.TOO_MANY_DEVICES_MATCHED_BY_ATTRIBUTE_FILTERS_IN_DELETION, + deviceIDs.size(), + ATTRIBUTE_FILTER_DELETE_DEVICE_IN_LIMIT, + attributeColumns)); + } + tagPredicate = new DeviceIn(deviceIDs); + } if (tagPredicate != null) { - predicate.setIdPredicate(tagPredicate); + predicate.setTagPredicate(tagPredicate); } if (timeRange.getStartTime() > timeRange.getEndTime()) { throw new SemanticException( @@ -494,40 +593,81 @@ private static void parseAndPredicate( expressionQueue.addAll(expression.getTerms()); } - private static IDPredicate parseIsNull( - IsNullPredicate isNullPredicate, IDPredicate oldPredicate, TsTable table) { + private static void collectAttributeColumn( + final List attributeColumns, final String attributeColumn) { + if (Objects.nonNull(attributeColumn) && !attributeColumns.contains(attributeColumn)) { + attributeColumns.add(attributeColumn); + } + } + + private static PredicateParseResult parseIsNull( + IsNullPredicate isNullPredicate, TagPredicate oldPredicate, TsTable table) { Expression leftHandExp = isNullPredicate.getValue(); if (!(leftHandExp instanceof Identifier)) { throw new SemanticException( DataNodeQueryMessages.LEFT_HAND_EXPRESSION_IS_NOT_AN_IDENTIFIER + leftHandExp); } String columnName = ((Identifier) leftHandExp).getValue(); + final TsTableColumnSchema columnSchema = table.getColumnSchema(columnName); + if (Objects.nonNull(columnSchema) + && columnSchema.getColumnCategory().equals(TsTableColumnCategory.ATTRIBUTE)) { + return PredicateParseResult.attribute(columnName, oldPredicate); + } + int tagColumnOrdinal = table.getTagColumnOrdinal(columnName); + if (tagColumnOrdinal == -1) { + throw new SemanticException( + String.format( + DataNodeQueryMessages.THE_COLUMN_S_DOES_NOT_EXIST_OR_IS_NOT_A_TAG_COLUMN, + columnName)); + } + + // the first segment is the table name, so + 1 + TagPredicate newPredicate = new SegmentExactMatch(null, tagColumnOrdinal + 1); + return PredicateParseResult.tag(combinePredicates(oldPredicate, newPredicate)); + } + + private static PredicateParseResult parseIsNotNull( + IsNotNullPredicate isNotNullPredicate, TagPredicate oldPredicate, TsTable table) { + Expression leftHandExp = isNotNullPredicate.getValue(); + if (!(leftHandExp instanceof Identifier)) { + throw new SemanticException( + DataNodeQueryMessages.LEFT_HAND_EXPRESSION_IS_NOT_AN_IDENTIFIER + leftHandExp); + } + String columnName = ((Identifier) leftHandExp).getValue(); + final TsTableColumnSchema columnSchema = table.getColumnSchema(columnName); + if (Objects.nonNull(columnSchema) + && columnSchema.getColumnCategory().equals(TsTableColumnCategory.ATTRIBUTE)) { + return PredicateParseResult.attribute(columnName, oldPredicate); + } int tagColumnOrdinal = table.getTagColumnOrdinal(columnName); if (tagColumnOrdinal == -1) { throw new SemanticException( - "The column '" + columnName + "' does not exist or is not a tag column"); + String.format( + DataNodeQueryMessages.THE_COLUMN_S_DOES_NOT_EXIST_OR_IS_NOT_A_TAG_COLUMN, + columnName)); } // the first segment is the table name, so + 1 - IDPredicate newPredicate = new SegmentExactMatch(null, tagColumnOrdinal + 1); - return combinePredicates(oldPredicate, newPredicate); + TagPredicate newPredicate = new SegmentNotNull(tagColumnOrdinal + 1); + return PredicateParseResult.tag(combinePredicates(oldPredicate, newPredicate)); } - private static IDPredicate combinePredicates(IDPredicate oldPredicate, IDPredicate newPredicate) { + private static TagPredicate combinePredicates( + TagPredicate oldPredicate, TagPredicate newPredicate) { if (oldPredicate == null) { return newPredicate; } - if (oldPredicate instanceof IDPredicate.And) { + if (oldPredicate instanceof TagPredicate.And) { ((And) oldPredicate).add(newPredicate); return oldPredicate; } - return new IDPredicate.And(oldPredicate, newPredicate); + return new TagPredicate.And(oldPredicate, newPredicate); } - private static IDPredicate parseComparison( + private static PredicateParseResult parseComparison( ComparisonExpression comparisonExpression, TimeRange timeRange, - IDPredicate oldPredicate, + TagPredicate oldPredicate, TsTable table) { Expression left = comparisonExpression.getLeft(); Expression right = comparisonExpression.getRight(); @@ -570,18 +710,55 @@ private static IDPredicate parseComparison( "The operator of time predicate must be <, <=, >, or >=: " + right); } - return oldPredicate; + return PredicateParseResult.time(oldPredicate); } // tag predicate String columnName = identifier.getValue(); + final TsTableColumnSchema columnSchema = table.getColumnSchema(columnName); + if (Objects.nonNull(columnSchema) + && columnSchema.getColumnCategory().equals(TsTableColumnCategory.ATTRIBUTE)) { + validateAttributeComparison(comparisonExpression); + return PredicateParseResult.attribute(columnName, oldPredicate); + } int tagColumnOrdinal = table.getTagColumnOrdinal(columnName); if (tagColumnOrdinal == -1) { throw new SemanticException( - "The column '" + columnName + "' does not exist or is not a tag column"); + String.format( + DataNodeQueryMessages.THE_COLUMN_S_DOES_NOT_EXIST_OR_IS_NOT_A_TAG_COLUMN, + columnName)); + } + + TagPredicate newPredicate = getTagPredicate(comparisonExpression, right, tagColumnOrdinal); + return PredicateParseResult.tag(combinePredicates(oldPredicate, newPredicate)); + } + + private static void validateAttributeComparison(final ComparisonExpression comparisonExpression) { + switch (comparisonExpression.getOperator()) { + case EQUAL: + case NOT_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + break; + case IS_DISTINCT_FROM: + default: + throw new SemanticException( + DataNodeQueryMessages.THE_OPERATOR_OF_ATTRIBUTE_PREDICATE_MUST_BE_FOR + + comparisonExpression.getRight()); } - IDPredicate newPredicate = getTagPredicate(comparisonExpression, right, tagColumnOrdinal); - return combinePredicates(oldPredicate, newPredicate); + final Expression right = comparisonExpression.getRight(); + if (right instanceof NullLiteral) { + throw new SemanticException( + DataNodeQueryMessages + .THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_CANNOT_BE_NULL_WITH_COMPARISON_OPERATOR); + } + if (!(right instanceof StringLiteral)) { + throw new SemanticException( + DataNodeQueryMessages.THE_RIGHT_HAND_VALUE_OF_ATTRIBUTE_PREDICATE_MUST_BE_A_STRING + + right); + } } private static String getTimeColumnName(final TsTable table) { @@ -595,7 +772,7 @@ private static String getTimeColumnName(final TsTable table) { return timeColumnSchema.getColumnName(); } - private static IDPredicate getTagPredicate( + private static TagPredicate getTagPredicate( ComparisonExpression comparisonExpression, Expression right, int tagColumnOrdinal) { if (comparisonExpression.getOperator() != ComparisonExpression.Operator.EQUAL) { throw new SemanticException( @@ -616,6 +793,54 @@ private static IDPredicate getTagPredicate( return new SegmentExactMatch(rightHandValue, tagColumnOrdinal + 1); } + private static Expression toSymbolReferenceExpression(final Expression expression) { + return ExpressionTreeRewriter.rewriteWith( + new ExpressionRewriter<>() { + @Override + public Expression rewriteIdentifier( + final Identifier node, + final Void context, + final ExpressionTreeRewriter treeRewriter) { + return new SymbolReference(node.getValue()); + } + }, + expression); + } + + private static class PredicateParseResult { + private final TagPredicate tagPredicate; + private final String attributeColumn; + private final boolean filterDevice; + + private PredicateParseResult( + final TagPredicate tagPredicate, final String attributeColumn, final boolean filterDevice) { + this.tagPredicate = tagPredicate; + this.attributeColumn = attributeColumn; + this.filterDevice = filterDevice; + } + + private static PredicateParseResult time(final TagPredicate tagPredicate) { + return new PredicateParseResult(tagPredicate, null, false); + } + + private static PredicateParseResult tag(final TagPredicate tagPredicate) { + return new PredicateParseResult(tagPredicate, null, true); + } + + private static PredicateParseResult attribute( + final String attributeColumn, final TagPredicate tagPredicate) { + return new PredicateParseResult(tagPredicate, attributeColumn, true); + } + + private boolean shouldQueryDevice() { + return Objects.nonNull(attributeColumn); + } + + private boolean shouldFilterDevice() { + return filterDevice; + } + } + public interface DataPartitionQueryFunc { DataPartition queryDataPartition( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 6bc5e2ab4f2bc..e9295240b7b38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -686,7 +686,7 @@ public Scope visitDeleteDevice(final DeleteDevice node, final Optional co final TsTable table = DataNodeTableCache.getInstance().getTable(node.getDatabase(), node.getTableName()); DataNodeTreeViewSchemaUtils.checkTableInWrite(node.getDatabase(), table); - node.parseModEntries(table); + node.parseModEntries(table, node.getDatabase(), queryContext); analyzeTraverseDevice(node, context, node.getWhere().isPresent()); node.parseWhere( null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java index d2fa9bbabde4a..39bc2ecfed5dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/DeleteDevice.java @@ -94,8 +94,9 @@ public boolean isMayDeleteDevice() { return mayDeleteDevice; } - public void parseModEntries(final TsTable table) { - modEntries = AnalyzeUtils.parseExpressions2ModEntries(where, table); + public void parseModEntries( + final TsTable table, final String databaseName, final MPPQueryContext context) { + modEntries = AnalyzeUtils.parseExpressions2ModEntries(where, table, databaseName, context); } public void serializeModEntries(final DataOutputStream stream) throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 532b687857714..44d5397c7bbc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -49,6 +49,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DataRegionException; +import org.apache.iotdb.db.exception.DirectBufferMemoryAllocationException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessRejectException; @@ -258,8 +259,7 @@ private void asyncRecover(List> futures) { try { dataRegion = buildNewDataRegion(sgName, dataRegionId); } catch (DataRegionException e) { - LOGGER.error( - "Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e); + handleDataRegionRecoverFailure(sgName, dataRegionId, e); return null; } dataRegionMap.put(dataRegionId, dataRegion); @@ -274,6 +274,17 @@ private void asyncRecover(List> futures) { } } + void handleDataRegionRecoverFailure( + String sgName, DataRegionId dataRegionId, DataRegionException e) throws DataRegionException { + LOGGER.error("Failed to recover data region {}[{}]", sgName, dataRegionId.getId(), e); + WALRecoverManager.getInstance() + .getAllDataRegionScannedLatch() + .countDownWithException(e.getMessage()); + if (e instanceof DirectBufferMemoryAllocationException) { + throw e; + } + } + /** get StorageGroup -> DataRegionIdList map from data/system directory. */ public Map> getLocalDataRegionInfo() { File system = SystemFileFactory.INSTANCE.getFile(systemDir); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 03807c60d74c1..e41dea4f774de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -61,6 +61,7 @@ import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.exception.DataTypeInconsistentException; +import org.apache.iotdb.db.exception.DirectBufferMemoryAllocationException; import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.WriteProcessRejectException; @@ -119,10 +120,10 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; @@ -594,10 +595,6 @@ private void recover() throws DataRegionException { try { recoverCompaction(); } catch (Exception e) { - // signal wal recover manager to recover this region's files - WALRecoverManager.getInstance() - .getAllDataRegionScannedLatch() - .countDownWithException(e.getMessage()); throw new DataRegionException(e); } @@ -768,10 +765,6 @@ private void recover() throws DataRegionException { updatePartitionFileVersion(partitionNum, resource.getVersion()); } } catch (IOException e) { - // signal wal recover manager to recover this region's files - WALRecoverManager.getInstance() - .getAllDataRegionScannedLatch() - .countDownWithException(e.getMessage()); throw new DataRegionException(e); } @@ -3494,12 +3487,12 @@ private void deleteDataInSealedFiles(Collection sealedTsFiles, M List deletedByMods = new ArrayList<>(); List deletedByFiles = new ArrayList<>(); boolean isDropMeasurementExist = false; - IDPredicate.IDPredicateType idPredicateType = null; + TagPredicate.TagPredicateType tagPredicateType = null; if (deletion instanceof TableDeletionEntry) { TableDeletionEntry tableDeletionEntry = (TableDeletionEntry) deletion; isDropMeasurementExist = !tableDeletionEntry.getPredicate().getMeasurementNames().isEmpty(); - idPredicateType = tableDeletionEntry.getPredicate().getIdPredicateType(); + tagPredicateType = tableDeletionEntry.getPredicate().getTagPredicateType(); } for (TsFileResource sealedTsFile : sealedTsFiles) { @@ -3572,7 +3565,7 @@ private void deleteDataInSealedFiles(Collection sealedTsFiles, M fileEndTime); } if (isFileFullyMatchedByTime(deletion, fileStartTime, fileEndTime) - && idPredicateType.equals(IDPredicate.IDPredicateType.NOP) + && tagPredicateType.equals(TagPredicate.TagPredicateType.NOP) && !isDropMeasurementExist) { ++matchSize; } else { @@ -5117,11 +5110,9 @@ public void markDeleted() { private void acquireDirectBufferMemory() throws DataRegionException { long acquireDirectBufferMemCost = getAcquireDirectBufferMemCost(); if (!SystemInfo.getInstance().addDirectBufferMemoryCost(acquireDirectBufferMemCost)) { - throw new DataRegionException( - "Total allocated memory for direct buffer will be " - + (SystemInfo.getInstance().getDirectBufferMemoryCost() + acquireDirectBufferMemCost) - + ", which is greater than limit mem cost: " - + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit()); + throw new DirectBufferMemoryAllocationException( + SystemInfo.getInstance().getDirectBufferMemoryCost() + acquireDirectBufferMemCost, + SystemInfo.getInstance().getTotalDirectBufferMemorySizeLimit()); } this.directBufferMemoryCost = acquireDirectBufferMemCost; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 7bdd492fa74ac..72ac3f544c8d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -39,11 +39,11 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModFileManagement; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java index fcccaecc60f4c..b8722f6b41409 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.utils.io.BufferSerializable; import org.apache.iotdb.db.utils.io.StreamSerializable; @@ -42,7 +42,7 @@ public class DeletionPredicate implements StreamSerializable, BufferSerializable public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeletionPredicate.class); private String tableName; - private IDPredicate idPredicate = new NOP(); + private TagPredicate tagPredicate = new NOP(); // an empty list means affecting all columns private List measurementNames = Collections.emptyList(); @@ -52,32 +52,32 @@ public DeletionPredicate(String tableName) { this.tableName = tableName; } - public DeletionPredicate(String tableName, IDPredicate idPredicate) { + public DeletionPredicate(String tableName, TagPredicate tagPredicate) { this.tableName = tableName; - this.idPredicate = idPredicate; + this.tagPredicate = tagPredicate; } public DeletionPredicate( - String tableName, IDPredicate idPredicate, List measurementNames) { + String tableName, TagPredicate tagPredicate, List measurementNames) { this.tableName = tableName; - this.idPredicate = idPredicate; + this.tagPredicate = tagPredicate; this.measurementNames = measurementNames; } public boolean matches(IDeviceID deviceID) { - return tableName.equals(deviceID.getTableName()) && idPredicate.matches(deviceID); + return tableName.equals(deviceID.getTableName()) && tagPredicate.matches(deviceID); } - public void setIdPredicate(IDPredicate idPredicate) { - this.idPredicate = idPredicate; + public void setTagPredicate(TagPredicate tagPredicate) { + this.tagPredicate = tagPredicate; } - public IDPredicate getIdPredicate() { - return idPredicate; + public TagPredicate getTagPredicate() { + return tagPredicate; } - public IDPredicate.IDPredicateType getIdPredicateType() { - return this.idPredicate.type; + public TagPredicate.TagPredicateType getTagPredicateType() { + return this.tagPredicate.type; } public String getTableName() { @@ -95,7 +95,7 @@ public boolean affects(String measurementName) { @Override public long serialize(OutputStream stream) throws IOException { long size = ReadWriteIOUtils.writeVar(tableName, stream); - size += idPredicate.serialize(stream); + size += tagPredicate.serialize(stream); size += ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), stream); for (String measurementName : measurementNames) { size += ReadWriteIOUtils.writeVar(measurementName, stream); @@ -106,7 +106,7 @@ public long serialize(OutputStream stream) throws IOException { @Override public long serialize(ByteBuffer buffer) { long size = ReadWriteIOUtils.writeVar(tableName, buffer); - size += idPredicate.serialize(buffer); + size += tagPredicate.serialize(buffer); size += ReadWriteForEncodingUtils.writeVarInt(measurementNames.size(), buffer); for (String measurementName : measurementNames) { size += ReadWriteIOUtils.writeVar(measurementName, buffer); @@ -117,7 +117,7 @@ public long serialize(ByteBuffer buffer) { @Override public void deserialize(InputStream stream) throws IOException { tableName = ReadWriteIOUtils.readVarIntString(stream); - idPredicate = IDPredicate.createFrom(stream); + tagPredicate = TagPredicate.createFrom(stream); int measurementLength = ReadWriteForEncodingUtils.readVarInt(stream); if (measurementLength > 0) { @@ -133,7 +133,7 @@ public void deserialize(InputStream stream) throws IOException { @Override public void deserialize(ByteBuffer buffer) { tableName = ReadWriteIOUtils.readVarIntString(buffer); - idPredicate = IDPredicate.createFrom(buffer); + tagPredicate = TagPredicate.createFrom(buffer); int measurementLength = ReadWriteForEncodingUtils.readVarInt(buffer); if (measurementLength > 0) { @@ -150,7 +150,7 @@ public int serializedSize() { // table name + id predicate + measurement names int size = ModEntry.sizeToWriteVarString(tableName) - + idPredicate.serializedSize() + + tagPredicate.serializedSize() + ReadWriteForEncodingUtils.varIntSize(measurementNames.size()); for (String measurementName : measurementNames) { size += ModEntry.sizeToWriteVarString(measurementName); @@ -168,13 +168,13 @@ public boolean equals(Object o) { } DeletionPredicate that = (DeletionPredicate) o; return Objects.equals(tableName, that.tableName) - && Objects.equals(idPredicate, that.idPredicate) + && Objects.equals(tagPredicate, that.tagPredicate) && Objects.equals(measurementNames, that.measurementNames); } @Override public int hashCode() { - return Objects.hash(tableName, idPredicate, measurementNames); + return Objects.hash(tableName, tagPredicate, measurementNames); } @Override @@ -183,8 +183,8 @@ public String toString() { + "tableName='" + tableName + '\'' - + ", idPredicate=" - + idPredicate + + ", tagPredicate=" + + tagPredicate + ", measurementNames=" + measurementNames + '}'; @@ -194,7 +194,7 @@ public String toString() { public long ramBytesUsed() { return SHALLOW_SIZE + RamUsageEstimator.sizeOf(tableName) - + RamUsageEstimator.sizeOfObject(idPredicate) + + RamUsageEstimator.sizeOfObject(tagPredicate) + RamUsageEstimator.sizeOfArrayList(measurementNames); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java index 952bcc2eaf6ec..b2c7ccee4e5b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java @@ -21,7 +21,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.IDPredicateType; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.TagPredicateType; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.tsfile.file.metadata.IDeviceID; @@ -138,8 +138,8 @@ public String getTableName() { } public boolean isDroppingTable() { - IDPredicate idPredicate = predicate.getIdPredicate(); - return idPredicate.type == IDPredicateType.NOP + TagPredicate tagPredicate = predicate.getTagPredicate(); + return tagPredicate.type == TagPredicateType.NOP && predicate.getMeasurementNames().isEmpty() && timeRange.getMin() == Long.MIN_VALUE && timeRange.getMax() == Long.MAX_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java similarity index 58% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java index 487689d83b98b..8d41d23ca06e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TagPredicate.java @@ -35,11 +35,14 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; -public abstract class IDPredicate implements StreamSerializable, BufferSerializable, Accountable { +public abstract class TagPredicate implements StreamSerializable, BufferSerializable, Accountable { public int serializedSize() { // type @@ -47,11 +50,13 @@ public int serializedSize() { } @SuppressWarnings("java:S6548") - public enum IDPredicateType { + public enum TagPredicateType { NOP, FULL_EXACT_MATCH, SEGMENT_EXACT_MATCH, - AND; + AND, + DEVICE_IN, + SEGMENT_NOT_NULL; public long serialize(OutputStream stream) throws IOException { stream.write((byte) ordinal()); @@ -63,7 +68,7 @@ public long serialize(ByteBuffer buffer) { return 1; } - public static IDPredicateType deserialize(InputStream stream) throws IOException { + public static TagPredicateType deserialize(InputStream stream) throws IOException { int typeNum = stream.read(); if (typeNum == -1) { throw new EOFException(); @@ -71,14 +76,14 @@ public static IDPredicateType deserialize(InputStream stream) throws IOException return values()[typeNum]; } - public static IDPredicateType deserialize(ByteBuffer buffer) { + public static TagPredicateType deserialize(ByteBuffer buffer) { return values()[buffer.get()]; } } - protected final IDPredicateType type; + protected final TagPredicateType type; - protected IDPredicate(IDPredicateType type) { + protected TagPredicate(TagPredicateType type) { this.type = type; } @@ -94,17 +99,21 @@ public long serialize(ByteBuffer buffer) { return type.serialize(buffer); } - public static IDPredicate createFrom(ByteBuffer buffer) { - IDPredicateType type = IDPredicateType.deserialize(buffer); - IDPredicate predicate; - if (Objects.requireNonNull(type) == IDPredicateType.NOP) { + public static TagPredicate createFrom(ByteBuffer buffer) { + TagPredicateType type = TagPredicateType.deserialize(buffer); + TagPredicate predicate; + if (Objects.requireNonNull(type) == TagPredicateType.NOP) { predicate = new NOP(); - } else if (Objects.requireNonNull(type) == IDPredicateType.FULL_EXACT_MATCH) { + } else if (Objects.requireNonNull(type) == TagPredicateType.FULL_EXACT_MATCH) { predicate = new FullExactMatch(); - } else if (Objects.requireNonNull(type) == IDPredicateType.SEGMENT_EXACT_MATCH) { + } else if (Objects.requireNonNull(type) == TagPredicateType.SEGMENT_EXACT_MATCH) { predicate = new SegmentExactMatch(); - } else if (Objects.requireNonNull(type) == IDPredicateType.AND) { + } else if (Objects.requireNonNull(type) == TagPredicateType.AND) { predicate = new And(); + } else if (Objects.requireNonNull(type) == TagPredicateType.DEVICE_IN) { + predicate = new DeviceIn(); + } else if (Objects.requireNonNull(type) == TagPredicateType.SEGMENT_NOT_NULL) { + predicate = new SegmentNotNull(); } else { throw new IllegalArgumentException(StorageEngineMessages.UNRECOGNIZED_PREDICATE_TYPE + type); } @@ -112,17 +121,21 @@ public static IDPredicate createFrom(ByteBuffer buffer) { return predicate; } - public static IDPredicate createFrom(InputStream stream) throws IOException { - IDPredicateType type = IDPredicateType.deserialize(stream); - IDPredicate predicate; - if (Objects.requireNonNull(type) == IDPredicateType.NOP) { + public static TagPredicate createFrom(InputStream stream) throws IOException { + TagPredicateType type = TagPredicateType.deserialize(stream); + TagPredicate predicate; + if (Objects.requireNonNull(type) == TagPredicateType.NOP) { predicate = new NOP(); - } else if (Objects.requireNonNull(type) == IDPredicateType.FULL_EXACT_MATCH) { + } else if (Objects.requireNonNull(type) == TagPredicateType.FULL_EXACT_MATCH) { predicate = new FullExactMatch(); - } else if (Objects.requireNonNull(type) == IDPredicateType.SEGMENT_EXACT_MATCH) { + } else if (Objects.requireNonNull(type) == TagPredicateType.SEGMENT_EXACT_MATCH) { predicate = new SegmentExactMatch(); - } else if (Objects.requireNonNull(type) == IDPredicateType.AND) { + } else if (Objects.requireNonNull(type) == TagPredicateType.AND) { predicate = new And(); + } else if (Objects.requireNonNull(type) == TagPredicateType.DEVICE_IN) { + predicate = new DeviceIn(); + } else if (Objects.requireNonNull(type) == TagPredicateType.SEGMENT_NOT_NULL) { + predicate = new SegmentNotNull(); } else { throw new IllegalArgumentException(StorageEngineMessages.UNRECOGNIZED_PREDICATE_TYPE + type); } @@ -130,11 +143,11 @@ public static IDPredicate createFrom(InputStream stream) throws IOException { return predicate; } - public static class NOP extends IDPredicate { + public static class NOP extends TagPredicate { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(NOP.class); public NOP() { - super(IDPredicateType.NOP); + super(TagPredicateType.NOP); } @Override @@ -173,19 +186,19 @@ public long ramBytesUsed() { } } - public static class FullExactMatch extends IDPredicate { + public static class FullExactMatch extends TagPredicate { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(FullExactMatch.class); private IDeviceID deviceID; public FullExactMatch(IDeviceID deviceID) { - super(IDPredicateType.FULL_EXACT_MATCH); + super(TagPredicateType.FULL_EXACT_MATCH); this.deviceID = deviceID; } public FullExactMatch() { - super(IDPredicateType.FULL_EXACT_MATCH); + super(TagPredicateType.FULL_EXACT_MATCH); } @Override @@ -250,7 +263,7 @@ public long ramBytesUsed() { } } - public static class SegmentExactMatch extends IDPredicate { + public static class SegmentExactMatch extends TagPredicate { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SegmentExactMatch.class); @@ -258,13 +271,13 @@ public static class SegmentExactMatch extends IDPredicate { private int segmentIndex; public SegmentExactMatch(String pattern, int segmentIndex) { - super(IDPredicateType.SEGMENT_EXACT_MATCH); + super(TagPredicateType.SEGMENT_EXACT_MATCH); this.pattern = pattern; this.segmentIndex = segmentIndex; } public SegmentExactMatch() { - super(IDPredicateType.SEGMENT_EXACT_MATCH); + super(TagPredicateType.SEGMENT_EXACT_MATCH); } @Override @@ -341,17 +354,187 @@ public long ramBytesUsed() { } } - public static class And extends IDPredicate { + public static class DeviceIn extends TagPredicate { + + public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DeviceIn.class); + private final Set deviceIDs = new HashSet<>(); + + public DeviceIn(Collection deviceIDs) { + super(TagPredicateType.DEVICE_IN); + this.deviceIDs.addAll(deviceIDs); + } + + public DeviceIn() { + super(TagPredicateType.DEVICE_IN); + } + + @Override + public int serializedSize() { + int serializedSize = super.serializedSize(); + serializedSize += ReadWriteForEncodingUtils.varIntSize(deviceIDs.size()); + for (IDeviceID deviceID : deviceIDs) { + serializedSize += deviceID.serializedSize(); + } + return serializedSize; + } + + @Override + public long serialize(OutputStream stream) throws IOException { + long size = super.serialize(stream); + size += ReadWriteForEncodingUtils.writeVarInt(deviceIDs.size(), stream); + for (IDeviceID deviceID : deviceIDs) { + size += deviceID.serialize(stream); + } + return size; + } + + @Override + public long serialize(ByteBuffer buffer) { + long size = super.serialize(buffer); + size += ReadWriteForEncodingUtils.writeVarInt(deviceIDs.size(), buffer); + for (IDeviceID deviceID : deviceIDs) { + size += deviceID.serialize(buffer); + } + return size; + } + + @Override + public void deserialize(InputStream stream) throws IOException { + int size = ReadWriteForEncodingUtils.readVarInt(stream); + for (int i = 0; i < size; i++) { + deviceIDs.add(Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(stream)); + } + } + + @Override + public void deserialize(ByteBuffer buffer) { + int size = ReadWriteForEncodingUtils.readVarInt(buffer); + for (int i = 0; i < size; i++) { + deviceIDs.add(Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(buffer)); + } + } + + @Override + public boolean matches(IDeviceID deviceID) { + return deviceIDs.contains(deviceID); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DeviceIn deviceIn = (DeviceIn) o; + return Objects.equals(deviceIDs, deviceIn.deviceIDs); + } + + @Override + public int hashCode() { + return Objects.hashCode(deviceIDs); + } + + @Override + public String toString() { + return "DeviceIn{" + "deviceIDs=" + deviceIDs + '}'; + } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOfHashSet(deviceIDs); + } + } + + public static class SegmentNotNull extends TagPredicate { + + public static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SegmentNotNull.class); + private int segmentIndex; + + public SegmentNotNull(int segmentIndex) { + super(TagPredicateType.SEGMENT_NOT_NULL); + this.segmentIndex = segmentIndex; + } + + public SegmentNotNull() { + super(TagPredicateType.SEGMENT_NOT_NULL); + } + + @Override + public int serializedSize() { + return super.serializedSize() + ReadWriteForEncodingUtils.varIntSize(segmentIndex); + } + + @Override + public long serialize(OutputStream stream) throws IOException { + long size = super.serialize(stream); + size += ReadWriteForEncodingUtils.writeVarInt(segmentIndex, stream); + return size; + } + + @Override + public long serialize(ByteBuffer buffer) { + long size = super.serialize(buffer); + size += ReadWriteForEncodingUtils.writeVarInt(segmentIndex, buffer); + return size; + } + + @Override + public void deserialize(InputStream stream) throws IOException { + segmentIndex = ReadWriteForEncodingUtils.readVarInt(stream); + } + + @Override + public void deserialize(ByteBuffer buffer) { + segmentIndex = ReadWriteForEncodingUtils.readVarInt(buffer); + } + + @Override + public boolean matches(IDeviceID deviceID) { + return deviceID.segment(segmentIndex) != null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentNotNull that = (SegmentNotNull) o; + return segmentIndex == that.segmentIndex; + } + + @Override + public int hashCode() { + return Objects.hashCode(segmentIndex); + } + + @Override + public String toString() { + return "SegmentNotNull{" + "segmentIndex=" + segmentIndex + '}'; + } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE; + } + } + + public static class And extends TagPredicate { public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(And.class); - private final List predicates = new ArrayList<>(); + private final List predicates = new ArrayList<>(); - public And(IDPredicate... predicates) { - super(IDPredicateType.AND); + public And(TagPredicate... predicates) { + super(TagPredicateType.AND); Collections.addAll(this.predicates, predicates); } - public void add(IDPredicate predicate) { + public void add(TagPredicate predicate) { predicates.add(predicate); } @@ -359,7 +542,7 @@ public void add(IDPredicate predicate) { public int serializedSize() { int serializedSize = super.serializedSize(); serializedSize += ReadWriteForEncodingUtils.varIntSize(predicates.size()); - for (IDPredicate predicate : predicates) { + for (TagPredicate predicate : predicates) { serializedSize += predicate.serializedSize(); } return serializedSize; @@ -369,7 +552,7 @@ public int serializedSize() { public long serialize(OutputStream stream) throws IOException { long size = super.serialize(stream); size += ReadWriteForEncodingUtils.writeVarInt(predicates.size(), stream); - for (IDPredicate predicate : predicates) { + for (TagPredicate predicate : predicates) { size += predicate.serialize(stream); } return size; @@ -379,7 +562,7 @@ public long serialize(OutputStream stream) throws IOException { public long serialize(ByteBuffer buffer) { long size = super.serialize(buffer); size += ReadWriteForEncodingUtils.writeVarInt(predicates.size(), buffer); - for (IDPredicate predicate : predicates) { + for (TagPredicate predicate : predicates) { size += predicate.serialize(buffer); } return size; @@ -389,7 +572,7 @@ public long serialize(ByteBuffer buffer) { public void deserialize(InputStream stream) throws IOException { int size = ReadWriteForEncodingUtils.readVarInt(stream); for (int i = 0; i < size; i++) { - predicates.add(IDPredicate.createFrom(stream)); + predicates.add(TagPredicate.createFrom(stream)); } } @@ -397,7 +580,7 @@ public void deserialize(InputStream stream) throws IOException { public void deserialize(ByteBuffer buffer) { int size = ReadWriteForEncodingUtils.readVarInt(buffer); for (int i = 0; i < size; i++) { - predicates.add(IDPredicate.createFrom(buffer)); + predicates.add(TagPredicate.createFrom(buffer)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index 2effd009ab6ec..8d09eb2de18de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -35,9 +35,9 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java index 7a7d71bef5127..6f14ac8638467 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/PatternTreeMapTest.java @@ -24,9 +24,9 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory.ModsSerializer; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java index bcdc84a1fd02c..be35ae8c881a9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.tsfile.read.common.TimeRange; import org.junit.After; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java index 8eb43cab0771a..da0d02a33113a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java @@ -43,8 +43,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.tsfile.read.common.TimeRange; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java index 0a7fbfb699dc9..a4da1fc2fdf8b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipePlanTablePatternParseVisitorTest.java @@ -29,8 +29,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceAttributeUpdateNode; import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.tsfile.read.common.TimeRange; import org.junit.Assert; @@ -115,12 +115,12 @@ public void testDeleteData() { new TableDeletionEntry( new DeletionPredicate( "ac", - new IDPredicate.And( - new IDPredicate.FullExactMatch( + new TagPredicate.And( + new TagPredicate.FullExactMatch( DeviceIDFactory.getInstance() .getDeviceID( new PartialPath(new String[] {"ac", "device1"}))), - new IDPredicate.SegmentExactMatch("device2", 1))), + new TagPredicate.SegmentExactMatch("device2", 1))), new TimeRange(0, 1))), "db1"), tablePattern) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java index bbeaa629d4f72..915006a828663 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java @@ -34,6 +34,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; @@ -66,7 +68,9 @@ public void testParseDeletePredicateWithRenamedTimeColumn() { new Identifier("ts"), new LongLiteral("100")); - List entries = AnalyzeUtils.parseExpressions2ModEntries(expression, table); + List entries = + AnalyzeUtils.parseExpressions2ModEntries( + expression, table, "db", new MPPQueryContext(new QueryId("1"))); assertEquals(1, entries.size()); assertEquals(Long.MIN_VALUE, entries.get(0).getStartTime()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java index e97c11c1b47a1..910c38f44eb2d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java @@ -24,11 +24,11 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.And; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.SegmentExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.And; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; import org.apache.tsfile.file.metadata.IDeviceID.Factory; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java index 4c4ac20e9aeba..b0b9ce2880ab8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java @@ -18,9 +18,13 @@ */ package org.apache.iotdb.db.storageengine; +import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.exception.DataRegionException; +import org.apache.iotdb.db.exception.DirectBufferMemoryAllocationException; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; import com.google.common.collect.Lists; import org.junit.After; @@ -84,4 +88,45 @@ public void testGetTimePartitionId() { Assert.assertEquals(1, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 - 1)); Assert.assertEquals(2, TimePartitionUtils.getTimePartitionId(timePartitionInterval * 2 + 1)); } + + @Test + public void testNotifyWALRecoverManagerWhenDirectBufferAllocationFailed() throws Exception { + DirectBufferMemoryAllocationException directBufferMemoryAllocationException = + new DirectBufferMemoryAllocationException(2, 1); + WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(1)); + try { + try { + storageEngine.handleDataRegionRecoverFailure( + "root.sg", new DataRegionId(0), directBufferMemoryAllocationException); + Assert.fail("Expected data region recovery to fail."); + } catch (DataRegionException e) { + Assert.assertSame(directBufferMemoryAllocationException, e); + } + + ExceptionalCountDownLatch latch = + WALRecoverManager.getInstance().getAllDataRegionScannedLatch(); + Assert.assertTrue(latch.hasException()); + Assert.assertEquals( + directBufferMemoryAllocationException.getMessage(), latch.getExceptionMessage()); + } finally { + WALRecoverManager.getInstance().clear(); + } + } + + @Test + public void testNotifyWALRecoverManagerButContinueForOtherDataRegionException() throws Exception { + DataRegionException dataRegionException = new DataRegionException("other recovery failure"); + WALRecoverManager.getInstance().setAllDataRegionScannedLatch(new ExceptionalCountDownLatch(1)); + try { + storageEngine.handleDataRegionRecoverFailure( + "root.sg", new DataRegionId(0), dataRegionException); + + ExceptionalCountDownLatch latch = + WALRecoverManager.getInstance().getAllDataRegionScannedLatch(); + Assert.assertTrue(latch.hasException()); + Assert.assertEquals(dataRegionException.getMessage(), latch.getExceptionMessage()); + } finally { + WALRecoverManager.getInstance().clear(); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index 152189332b59d..f2487eed6c948 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -68,9 +68,9 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java index 9ff8a401150b8..69e83e769a3ce 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java @@ -35,9 +35,9 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.write.WriteProcessException; @@ -333,7 +333,7 @@ public void testCompactionWithAllValueColumnDeletion() throws IOException, Illeg new TableDeletionEntry( new DeletionPredicate( "t1", - new IDPredicate.FullExactMatch(deviceID), + new TagPredicate.FullExactMatch(deviceID), Collections.singletonList("s0")), new TimeRange(Long.MIN_VALUE, 11))); resource1 @@ -342,7 +342,7 @@ public void testCompactionWithAllValueColumnDeletion() throws IOException, Illeg new TableDeletionEntry( new DeletionPredicate( "t1", - new IDPredicate.FullExactMatch(deviceID), + new TagPredicate.FullExactMatch(deviceID), Collections.singletonList("s1")), new TimeRange(Long.MIN_VALUE, 11))); resource1 @@ -351,7 +351,7 @@ public void testCompactionWithAllValueColumnDeletion() throws IOException, Illeg new TableDeletionEntry( new DeletionPredicate( "t1", - new IDPredicate.FullExactMatch(deviceID), + new TagPredicate.FullExactMatch(deviceID), Collections.singletonList("s2")), new TimeRange(Long.MIN_VALUE, 11))); resource1 @@ -360,7 +360,7 @@ public void testCompactionWithAllValueColumnDeletion() throws IOException, Illeg new TableDeletionEntry( new DeletionPredicate( "t1", - new IDPredicate.FullExactMatch(deviceID), + new TagPredicate.FullExactMatch(deviceID), Collections.singletonList("s3")), new TimeRange(Long.MIN_VALUE, 11))); resource1.getModFileForWrite().close(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java index fb4ce06fc33ca..58fe5e90245f2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java @@ -23,9 +23,10 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.SegmentExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.DeviceIn; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; import org.apache.iotdb.db.utils.constant.TestConstant; import org.apache.tsfile.file.metadata.IDeviceID.Factory; @@ -131,6 +132,14 @@ public void readMyWrite() { new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), new TimeRange(5, 6)), new TableDeletionEntry(new DeletionPredicate("table4"), new TimeRange(7, 8)), + new TableDeletionEntry( + new DeletionPredicate( + "table5", + new DeviceIn( + Arrays.asList( + Factory.DEFAULT_FACTORY.create(new String[] {"table5", "id1", "id2"}), + Factory.DEFAULT_FACTORY.create(new String[] {"table5", "id3", "id4"})))), + new TimeRange(9, 10)), }; try (ModificationFile mFile = new ModificationFile(tempFileName, false)) { for (int i = 0; i < 4; i++) { @@ -141,11 +150,11 @@ public void readMyWrite() { assertEquals(modifications[i], modificationList.get(i)); } - for (int i = 4; i < 8; i++) { + for (int i = 4; i < modifications.length; i++) { mFile.write(modifications[i]); } modificationList = mFile.getAllMods(); - for (int i = 0; i < 8; i++) { + for (int i = 0; i < modifications.length; i++) { assertEquals(modifications[i], modificationList.get(i)); } } catch (IOException e) { @@ -172,6 +181,14 @@ public void writeVerifyTest() { new FullExactMatch(Factory.DEFAULT_FACTORY.create(new String[] {"id1", "id2"}))), new TimeRange(5, 6)), new TableDeletionEntry(new DeletionPredicate("table4"), new TimeRange(7, 8)), + new TableDeletionEntry( + new DeletionPredicate( + "table5", + new DeviceIn( + Arrays.asList( + Factory.DEFAULT_FACTORY.create(new String[] {"table5", "id1", "id2"}), + Factory.DEFAULT_FACTORY.create(new String[] {"table5", "id3", "id4"})))), + new TimeRange(9, 10)), }; try (ModificationFile mFile = new ModificationFile(tempFileName, false)) { mFile.write(Arrays.asList(modifications)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java index f4e9095794584..99a889e2e5f2a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntryTest.java @@ -18,10 +18,11 @@ */ package org.apache.iotdb.db.storageengine.dataregion.modification; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.And; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP; -import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.SegmentExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.And; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.DeviceIn; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.FullExactMatch; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.NOP; +import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate.SegmentExactMatch; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; @@ -69,7 +70,28 @@ public void testSerialization() throws IOException { public void testDeserializePredicateTypeFromEmptyStream() { assertThrows( EOFException.class, - () -> IDPredicate.IDPredicateType.deserialize(new ByteArrayInputStream(new byte[0]))); + () -> TagPredicate.TagPredicateType.deserialize(new ByteArrayInputStream(new byte[0]))); + } + + @Test + public void testDeviceInSerialization() throws IOException { + DeviceIn predicate = + new DeviceIn( + Arrays.asList( + Factory.DEFAULT_FACTORY.create(new String[] {"table1", "id1", "id2"}), + Factory.DEFAULT_FACTORY.create(new String[] {"table1", "id3", "id4"}))); + + ByteBuffer buffer = ByteBuffer.allocate(predicate.serializedSize()); + predicate.serialize(buffer); + assertEquals(predicate.serializedSize(), buffer.position()); + buffer.flip(); + assertEquals(predicate, TagPredicate.createFrom(buffer)); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + predicate.serialize(bos); + byte[] byteArray = bos.toByteArray(); + assertEquals(predicate.serializedSize(), byteArray.length); + assertEquals(predicate, TagPredicate.createFrom(new ByteArrayInputStream(byteArray))); } @Test @@ -92,6 +114,15 @@ public void testAffectDevice() { "table1", new And(new SegmentExactMatch("id1", 1), new SegmentExactMatch("id2", 2))), new TimeRange(1, 5)); + TableDeletionEntry entry5 = + new TableDeletionEntry( + new DeletionPredicate( + "table1", + new DeviceIn( + Arrays.asList( + Factory.DEFAULT_FACTORY.create(new String[] {"table1", "id1", "id2"}), + Factory.DEFAULT_FACTORY.create(new String[] {"table1", "id1"})))), + new TimeRange(1, 5)); IDeviceID deviceID1 = Factory.DEFAULT_FACTORY.create(new String[] {"table1", "id1", "id2"}); IDeviceID deviceID2 = @@ -128,6 +159,13 @@ public void testAffectDevice() { assertFalse(entry4.affects(deviceID4)); assertFalse(entry4.affects(deviceID5)); assertFalse(entry4.affects(deviceID6)); + + assertTrue(entry5.affects(deviceID1)); + assertFalse(entry5.affects(deviceID2)); + assertFalse(entry5.affects(deviceID3)); + assertTrue(entry5.affects(deviceID4)); + assertFalse(entry5.affects(deviceID5)); + assertFalse(entry5.affects(deviceID6)); } @Test