Skip to content

Commit ad1cf71

Browse files
committed
Fix pipe tree database creation on receiver (#17991)
(cherry picked from commit 28c4e68)
1 parent eb41f56 commit ad1cf71

5 files changed

Lines changed: 189 additions & 6 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.conf.IoTDBConstant;
2424
import org.apache.iotdb.commons.exception.IllegalPathException;
25+
import org.apache.iotdb.commons.exception.IoTDBException;
26+
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2527
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
2628
import org.apache.iotdb.commons.path.PartialPath;
2729
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -74,7 +76,9 @@
7476
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7577
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
7678
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
79+
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
7780
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
81+
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
7882
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
7983
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
8084
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -97,6 +101,7 @@
97101
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
98102
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
99103

104+
import com.google.common.util.concurrent.ListenableFuture;
100105
import org.apache.tsfile.utils.Pair;
101106
import org.slf4j.Logger;
102107
import org.slf4j.LoggerFactory;
@@ -113,11 +118,15 @@
113118
import java.util.Objects;
114119
import java.util.Optional;
115120
import java.util.Set;
121+
import java.util.concurrent.ConcurrentHashMap;
122+
import java.util.concurrent.ExecutionException;
116123
import java.util.concurrent.atomic.AtomicLong;
117124
import java.util.concurrent.atomic.AtomicReference;
118125
import java.util.stream.Collectors;
119126
import java.util.stream.Stream;
120127

128+
import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
129+
121130
public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
122131

123132
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeReceiver.class);
@@ -152,6 +161,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
152161
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
153162

154163
private PipeMemoryBlock allocatedMemoryBlock;
164+
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
165+
private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();
166+
167+
private enum TreeDatabaseCreationResult {
168+
SKIPPED,
169+
CREATED_OR_EXISTED,
170+
CONFLICTED
171+
}
155172

156173
static {
157174
try {
@@ -876,6 +893,11 @@ private TSStatus executeStatement(final Statement statement) {
876893
return RpcUtils.getStatus(status.getCode(), status.getMessage());
877894
}
878895

896+
if (autoCreateTreeDatabaseIfNecessary(getTreeDatabaseName(statement))
897+
== TreeDatabaseCreationResult.CONFLICTED) {
898+
clearTreeDatabaseName(statement);
899+
}
900+
879901
return Coordinator.getInstance()
880902
.executeForTreeModel(
881903
shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement,
@@ -889,6 +911,91 @@ private TSStatus executeStatement(final Statement statement) {
889911
.status;
890912
}
891913

914+
private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) {
915+
if (database == null
916+
|| LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
917+
|| !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
918+
return TreeDatabaseCreationResult.SKIPPED;
919+
}
920+
if (autoCreatedTreeDatabases.contains(database)) {
921+
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
922+
}
923+
if (conflictedTreeDatabases.contains(database)) {
924+
return TreeDatabaseCreationResult.CONFLICTED;
925+
}
926+
927+
try {
928+
final DatabaseSchemaStatement statement =
929+
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
930+
statement.setDatabasePath(new PartialPath(database));
931+
statement.setEnablePrintExceptionLog(false);
932+
933+
final TSStatus permissionStatus = statement.checkPermissionBeforeProcess(username);
934+
if (permissionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
935+
throw new PipeException(permissionStatus.getMessage());
936+
}
937+
938+
final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
939+
final ListenableFuture<ConfigTaskResult> future =
940+
task.execute(ClusterConfigTaskExecutor.getInstance());
941+
final ConfigTaskResult result = future.get();
942+
final int statusCode = result.getStatusCode().getStatusCode();
943+
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
944+
|| statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
945+
autoCreatedTreeDatabases.add(database);
946+
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
947+
}
948+
if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
949+
conflictedTreeDatabases.add(database);
950+
return TreeDatabaseCreationResult.CONFLICTED;
951+
}
952+
throw new PipeException(
953+
String.format(
954+
"Auto create tree database failed: %s, status: %s",
955+
database, result.getStatus() == null ? result.getStatusCode() : result.getStatus()));
956+
} catch (final IllegalPathException e) {
957+
throw new PipeException(String.format("Illegal tree database %s.", database), e);
958+
} catch (final ExecutionException | InterruptedException e) {
959+
if (e instanceof InterruptedException) {
960+
Thread.currentThread().interrupt();
961+
}
962+
final Throwable rootCause = getRootCause(e);
963+
final int errorCode;
964+
if (rootCause instanceof IoTDBException) {
965+
errorCode = ((IoTDBException) rootCause).getErrorCode();
966+
} else if (rootCause instanceof IoTDBRuntimeException) {
967+
errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode();
968+
} else {
969+
errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
970+
}
971+
if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
972+
autoCreatedTreeDatabases.add(database);
973+
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
974+
}
975+
if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
976+
conflictedTreeDatabases.add(database);
977+
return TreeDatabaseCreationResult.CONFLICTED;
978+
}
979+
throw new PipeException("Auto create tree database failed because " + e.getMessage(), e);
980+
}
981+
}
982+
983+
private String getTreeDatabaseName(final Statement statement) {
984+
if (statement instanceof LoadTsFileStatement) {
985+
return ((LoadTsFileStatement) statement).getDatabase();
986+
}
987+
return null;
988+
}
989+
990+
static void clearTreeDatabaseName(final Statement statement) {
991+
if (statement instanceof LoadTsFileStatement) {
992+
final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement;
993+
loadTsFileStatement.setDatabase(null);
994+
loadTsFileStatement.setDatabaseLevel(
995+
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel());
996+
}
997+
}
998+
892999
@Override
8931000
protected TSStatus login() {
8941001
final IClientSession session = SESSION_MANAGER.getCurrSession();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,19 @@ public IScheduler doSchedule(
112112
&& ((PipeEnrichedStatement) statement).getInnerStatement()
113113
instanceof LoadTsFileStatement;
114114
if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) {
115+
final LoadTsFileStatement loadTsFileStatement =
116+
statement instanceof LoadTsFileStatement
117+
? (LoadTsFileStatement) statement
118+
: (LoadTsFileStatement) ((PipeEnrichedStatement) statement).getInnerStatement();
115119
scheduler =
116120
new LoadTsFileScheduler(
117121
distributedPlan,
118122
context,
119123
stateMachine,
120124
syncInternalServiceClientManager,
121125
partitionFetcher,
122-
isPipeEnrichedTsFileLoad);
126+
isPipeEnrichedTsFileLoad,
127+
loadTsFileStatement.getDatabase());
123128
} else {
124129
scheduler =
125130
new ClusterScheduler(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282

8383
import java.io.DataOutputStream;
8484
import java.io.File;
85+
import java.io.FileNotFoundException;
8586
import java.io.IOException;
8687
import java.nio.ByteBuffer;
8788
import java.util.ArrayList;
@@ -136,6 +137,7 @@ public class LoadTsFileScheduler implements IScheduler {
136137
private final PlanFragmentId fragmentId;
137138
private final Set<TRegionReplicaSet> allReplicaSets;
138139
private final boolean isGeneratedByPipe;
140+
private final String treeDatabaseForRetry;
139141
private final Map<TTimePartitionSlot, ProgressIndex> timePartitionSlotToProgressIndex;
140142
private final LoadTsFileDataCacheMemoryBlock block;
141143

@@ -145,7 +147,8 @@ public LoadTsFileScheduler(
145147
QueryStateMachine stateMachine,
146148
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
147149
IPartitionFetcher partitionFetcher,
148-
boolean isGeneratedByPipe) {
150+
boolean isGeneratedByPipe,
151+
String treeDatabaseForRetry) {
149152
this.queryContext = queryContext;
150153
this.stateMachine = stateMachine;
151154
this.tsFileNodeList = new ArrayList<>();
@@ -155,6 +158,7 @@ public LoadTsFileScheduler(
155158
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
156159
this.allReplicaSets = new HashSet<>();
157160
this.isGeneratedByPipe = isGeneratedByPipe;
161+
this.treeDatabaseForRetry = treeDatabaseForRetry;
158162
this.timePartitionSlotToProgressIndex = new HashMap<>();
159163
this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
160164

@@ -551,9 +555,7 @@ private void convertFailedTsFilesToTabletsAndRetry() {
551555
final TSStatus status =
552556
loadTsFileDataTypeConverter
553557
.convertForTreeModel(
554-
LoadTsFileStatement.createUnchecked(filePath)
555-
.setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
556-
.setConvertOnTypeMismatch(true))
558+
buildRetryTreeLoadStatement(filePath, failedNode.isDeleteAfterLoad()))
557559
.orElse(null);
558560

559561
if (loadTsFileDataTypeConverter.isSuccessful(status)) {
@@ -592,6 +594,22 @@ private void convertFailedTsFilesToTabletsAndRetry() {
592594
}
593595
}
594596

597+
private LoadTsFileStatement buildRetryTreeLoadStatement(
598+
final String filePath, final boolean deleteAfterLoad) throws FileNotFoundException {
599+
final LoadTsFileStatement statement =
600+
LoadTsFileStatement.createUnchecked(filePath)
601+
.setDeleteAfterLoad(deleteAfterLoad)
602+
.setConvertOnTypeMismatch(true);
603+
if (treeDatabaseForRetry != null) {
604+
statement.setDatabase(treeDatabaseForRetry);
605+
statement.updateDatabaseLevelByTreeDatabase();
606+
}
607+
if (isGeneratedByPipe) {
608+
statement.markIsGeneratedByPipe();
609+
}
610+
return statement;
611+
}
612+
595613
@Override
596614
public void stop(Throwable t) {
597615
// Do nothing

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,23 @@ public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType(
152152
Files.deleteIfExists(tsFile);
153153
}
154154
}
155+
156+
@Test
157+
public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception {
158+
final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile");
159+
try {
160+
final LoadTsFileStatement statement =
161+
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
162+
"root.test.sg_0", tsFile.toString(), true, true);
163+
164+
IoTDBDataNodeReceiver.clearTreeDatabaseName(statement);
165+
166+
Assert.assertNull(statement.getDatabase());
167+
Assert.assertEquals(
168+
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
169+
statement.getDatabaseLevel());
170+
} finally {
171+
Files.deleteIfExists(tsFile);
172+
}
173+
}
155174
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
2828
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
2929
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
30+
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
3031

3132
import org.junit.Assert;
3233
import org.junit.Before;
3334
import org.junit.Test;
3435
import org.mockito.Mock;
3536
import org.mockito.MockitoAnnotations;
3637

38+
import java.io.File;
39+
import java.lang.reflect.Method;
40+
import java.util.Collections;
41+
3742
import static org.mockito.Mockito.mock;
3843
import static org.mockito.Mockito.spy;
3944
import static org.mockito.Mockito.when;
@@ -50,6 +55,7 @@ public void before() {
5055
when(distributedQueryPlan.getRootSubPlan()).thenReturn(subPlan);
5156
when(subPlan.getPlanFragment()).thenReturn(planFragment);
5257
when(planFragment.getId()).thenReturn(new PlanFragmentId("test", 0));
58+
when(distributedQueryPlan.getInstances()).thenReturn(Collections.emptyList());
5359
}
5460

5561
@Test
@@ -62,9 +68,37 @@ public void tt() {
6268
mock(QueryStateMachine.class),
6369
mock(IClientManager.class),
6470
mock(IPartitionFetcher.class),
65-
false));
71+
false,
72+
null));
6673
t.start();
6774
Assert.assertNull(t.getTotalCpuTime());
6875
Assert.assertNull(t.getFragmentInfo());
6976
}
77+
78+
@Test
79+
public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception {
80+
final LoadTsFileScheduler scheduler =
81+
new LoadTsFileScheduler(
82+
distributedQueryPlan,
83+
mock(MPPQueryContext.class),
84+
mock(QueryStateMachine.class),
85+
mock(IClientManager.class),
86+
mock(IPartitionFetcher.class),
87+
true,
88+
"root.test.sg_0");
89+
final Method method =
90+
LoadTsFileScheduler.class.getDeclaredMethod(
91+
"buildRetryTreeLoadStatement", String.class, boolean.class);
92+
method.setAccessible(true);
93+
94+
final File tsFile = File.createTempFile("test", ".tsfile");
95+
tsFile.deleteOnExit();
96+
97+
final LoadTsFileStatement statement =
98+
(LoadTsFileStatement) method.invoke(scheduler, tsFile.getAbsolutePath(), true);
99+
100+
Assert.assertEquals("root.test.sg_0", statement.getDatabase());
101+
Assert.assertEquals(2, statement.getDatabaseLevel());
102+
Assert.assertTrue(statement.isGeneratedByPipe());
103+
}
70104
}

0 commit comments

Comments
 (0)