diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java new file mode 100644 index 000000000000..6d4f2b80b33c --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; +import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Set; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeAutoBasic.class}) +public class IoTDBPipeReceiverAutoCreateDisabledIT extends AbstractPipeDualTreeModelAutoIT { + + @Override + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + setupConfig(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); + } + + @Override + protected void setupConfig() { + super.setupConfig(); + receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false); + } + + @Test + public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries() throws Exception { + Assert.assertEquals(1, senderEnv.getConfigNodeWrapperList().size()); + Assert.assertEquals(1, senderEnv.getDataNodeWrapperList().size()); + Assert.assertEquals(1, receiverEnv.getConfigNodeWrapperList().size()); + Assert.assertEquals(1, receiverEnv.getDataNodeWrapperList().size()); + + final String createPipeSql = + String.format( + "create pipe test with source ('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true') " + + "with sink ('sink'='iotdb-thrift-sink', 'sink.node-urls'='%s');", + receiverEnv.getDataNodeWrapper(0).getIpAndPortString()); + final String createDatabaseSql = "create database root.test.sg;"; + final String createFirstTimeSeriesSql = + "create timeseries root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;"; + final String insertFirstSql = + "insert into root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`(time, `~!@#$%^&*()_+=:'\"/|[]{}`) " + + "values (1706659200,3.5),(1706660000, 15.5);"; + final String firstSelectSql = + "select `~!@#$%^&*()_+=:'\"/|[]{}` from root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`;"; + final String createSecondTimeSeriesSql = + "create timeseries root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}` int32;"; + final String insertSecondSql = + "insert into root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`" + + "(time, `~!@#$%^&*().,<>?_-+=:'\"/|[]{}`) values (1706666400,23456),(1706667400,23456),(1706686400,23456);"; + final String secondSelectSql = + "select * from root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`;"; + + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute(createPipeSql); + statement.execute(createDatabaseSql); + statement.execute(createFirstTimeSeriesSql); + statement.execute(insertFirstSql); + final QueryResult firstQueryResult = queryForResult(statement, firstSelectSql); + statement.execute(createSecondTimeSeriesSql); + statement.execute(insertSecondSql); + final QueryResult secondQueryResult = queryForResult(statement, secondSelectSql); + + awaitUntilFlush(senderEnv); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, firstSelectSql, firstQueryResult.header, firstQueryResult.rows); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, secondSelectSql, secondQueryResult.header, secondQueryResult.rows); + } + } + + private QueryResult queryForResult(final Statement statement, final String sql) + throws SQLException { + try (final ResultSet resultSet = statement.executeQuery(sql)) { + final ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); + final StringBuilder headerBuilder = new StringBuilder(); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + headerBuilder.append(resultSetMetaData.getColumnName(i)).append(","); + } + + final Set rows = new HashSet<>(); + while (resultSet.next()) { + final StringBuilder rowBuilder = new StringBuilder(); + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + rowBuilder.append(resultSet.getString(i)).append(","); + } + rows.add(rowBuilder.toString()); + } + return new QueryResult(headerBuilder.toString(), rows); + } + } + + private static class QueryResult { + private final String header; + private final Set rows; + + private QueryResult(final String header, final Set rows) { + this.header = header; + this.rows = rows; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 6671dad35487..6251a67689a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -86,6 +86,7 @@ import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -179,11 +180,7 @@ public void start() { final LoadSingleTsFileNode node = tsFileNodeList.get(i); final String filePath = node.getTsFileResource().getTsFilePath(); - if (node.isTableModel()) { - partitionFetcher.setDatabase(node.getDatabase()); - } else { - partitionFetcher.setDatabase(null); - } + partitionFetcher.setDatabase(getPartitionQueryDatabase(node, isGeneratedByPipe)); boolean isLoadSingleTsFileSuccess = true; boolean shouldRemoveFileFromLoadingSet = false; @@ -593,9 +590,10 @@ private void convertFailedTsFilesToTabletsAndRetry() { .orElse(null) : loadTsFileDataTypeConverter .convertForTreeModel( - LoadTsFileStatement.createUnchecked(filePath) - .setDeleteAfterLoad(failedNode.isDeleteAfterLoad()) - .setConvertOnTypeMismatch(true)) + buildRetryTreeLoadStatement( + filePath, + failedNode.isDeleteAfterLoad(), + getPartitionQueryDatabase(failedNode, isGeneratedByPipe))) .orElse(null); if (loadTsFileDataTypeConverter.isSuccessful(status)) { @@ -634,6 +632,27 @@ private void convertFailedTsFilesToTabletsAndRetry() { } } + static String getPartitionQueryDatabase( + final LoadSingleTsFileNode node, final boolean isGeneratedByPipe) { + return node.isTableModel() || isGeneratedByPipe ? node.getDatabase() : null; + } + + private LoadTsFileStatement buildRetryTreeLoadStatement( + final String filePath, final boolean deleteAfterLoad, final String database) + throws FileNotFoundException { + final LoadTsFileStatement statement = + LoadTsFileStatement.createUnchecked(filePath) + .setDeleteAfterLoad(deleteAfterLoad) + .setConvertOnTypeMismatch(true); + if (database != null) { + statement.setDatabase(database); + } + if (isGeneratedByPipe) { + statement.markIsGeneratedByPipe(); + } + return statement; + } + @Override public void stop(Throwable t) { // Do nothing @@ -851,7 +870,8 @@ public List queryDataPartition( subSlotList.stream() .map( pair -> - // (database != null) means this file will be loaded into table-model + // database is an explicit database hint for table-model loads and + // pipe-generated tree-model loads. database != null ? dataPartition.getDataRegionReplicaSetForWriting( pair.left, pair.right, database) @@ -874,7 +894,8 @@ private List toQueryParam( entry -> { DataPartitionQueryParam queryParam = new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue())); - // (database != null) means this file will be loaded into table-model + // database is an explicit database hint for table-model loads and + // pipe-generated tree-model loads. if (database != null) { queryParam.setDatabaseName(database); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java index 161a95b4ec9d..2db41c2ccb0f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode; import org.junit.Assert; import org.junit.Before; @@ -67,4 +68,23 @@ public void tt() { Assert.assertNull(t.getTotalCpuTime()); Assert.assertNull(t.getFragmentInfo()); } + + @Test + public void testGetPartitionQueryDatabaseForPipeGeneratedTreeModelLoad() { + final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class); + when(node.isTableModel()).thenReturn(false); + when(node.getDatabase()).thenReturn("root.test.sg"); + + Assert.assertEquals("root.test.sg", LoadTsFileScheduler.getPartitionQueryDatabase(node, true)); + Assert.assertNull(LoadTsFileScheduler.getPartitionQueryDatabase(node, false)); + } + + @Test + public void testGetPartitionQueryDatabaseForTableModelLoad() { + final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class); + when(node.isTableModel()).thenReturn(true); + when(node.getDatabase()).thenReturn("test"); + + Assert.assertEquals("test", LoadTsFileScheduler.getPartitionQueryDatabase(node, false)); + } }