Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -843,13 +843,20 @@ public TSStatus visitPipeEnrichedNonWritePlanNode(
public TSStatus visitPipeOperateSchemaQueueNode(
final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) {
final SchemaRegionId id = schemaRegion.getSchemaRegionId();
final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id);
if (node.isOpen() && !queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
} else if (!node.isOpen() && queue.isOpened()) {
logger.info("Closed pipe listening queue on schema region {}", id);
queue.close();
if (node.isOpen()) {
final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id);
if (!queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
}
} else {
final SchemaRegionListeningQueue queue =
PipeDataNodeAgent.runtime().schemaListenerIfPresent(id);
if (queue != null && queue.isOpened()) {
logger.info("Closed pipe listening queue on schema region {}", id);
queue.close();
PipeDataNodeAgent.runtime().cleanupSchemaListenerIfUnused(id);
}
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,34 @@ public boolean isReadOnly() {

@Override
public boolean takeSnapshot(final File snapshotDir) {
if (schemaRegion.createSnapshot(snapshotDir)
&& PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.createSnapshot(snapshotDir)) {
listen2Snapshot4PipeListener(true);
if (!schemaRegion.createSnapshot(snapshotDir)) {
return false;
}

final SchemaRegionListeningQueue listener =
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
if (listener == null) {
return true;
}
return false;

if (!listener.createSnapshot(snapshotDir)) {
return false;
}
listen2Snapshot4PipeListener(true);
return true;
}

@Override
public void loadSnapshot(final File latestSnapshotRootDir) {
schemaRegion.loadSnapshot(latestSnapshotRootDir);
PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
listen2Snapshot4PipeListener(false);
final SchemaRegionListeningQueue listener =
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
if (listener != null) {
listener.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
listen2Snapshot4PipeListener(false);
}
}

public void listen2Snapshot4PipeListener(final boolean isTmp) {
Expand All @@ -154,7 +163,10 @@ public void listen2Snapshot4PipeListener(final boolean isTmp) {
.toString(),
isTmp);
final SchemaRegionListeningQueue listener =
PipeDataNodeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId());
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
if (listener == null) {
return;
}
if (Objects.isNull(snapshotPaths) || Objects.isNull(snapshotPaths.get(0))) {
if (listener.isOpened()) {
logger.warn(
Expand All @@ -181,9 +193,11 @@ public TSStatus write(final IConsensusRequest request) {
final TSStatus result =
((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeDataNodeAgent.runtime()
.schemaListener(schemaRegion.getSchemaRegionId())
.tryListenToNode((PlanNode) request);
final SchemaRegionListeningQueue listener =
PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId());
if (listener != null) {
listener.tryListenToNode((PlanNode) request);
}
}
return result;
} catch (final IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public SchemaRegionListeningQueue schemaListener(SchemaRegionId schemaRegionId)
return regionListenerManager.listener(schemaRegionId);
}

public SchemaRegionListeningQueue schemaListenerIfPresent(SchemaRegionId schemaRegionId) {
return regionListenerManager.listenerIfPresent(schemaRegionId);
}

public int increaseAndGetSchemaListenerReferenceCount(SchemaRegionId schemaRegionId) {
return regionListenerManager.increaseAndGetReferenceCount(schemaRegionId);
}
Expand All @@ -156,6 +160,14 @@ public int decreaseAndGetSchemaListenerReferenceCount(SchemaRegionId schemaRegio
return regionListenerManager.decreaseAndGetReferenceCount(schemaRegionId);
}

public void cleanupSchemaListenerIfUnused(SchemaRegionId schemaRegionId) {
regionListenerManager.cleanupListenerIfUnused(schemaRegionId);
}

public void clearSchemaRegionState(SchemaRegionId schemaRegionId) {
regionListenerManager.clearSchemaRegionState(schemaRegionId);
}

public void notifySchemaLeaderReady(SchemaRegionId schemaRegionId) {
regionListenerManager.notifyLeaderReady(schemaRegionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.agent.runtime;

import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.pipe.agent.task.PipeTask;
import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionListenerMetrics;
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;

Expand All @@ -34,6 +33,7 @@

private final Map<SchemaRegionId, PipeSchemaRegionListener> id2ListenerMap =
new ConcurrentHashMap<>();
private final Map<SchemaRegionId, AtomicBoolean> id2LeaderReadyMap = new ConcurrentHashMap<>();

public synchronized Set<SchemaRegionId> regionIds() {
return id2ListenerMap.keySet();
Expand All @@ -44,6 +44,12 @@
.listeningQueue;
}

public synchronized SchemaRegionListeningQueue listenerIfPresent(
final SchemaRegionId schemaRegionId) {
final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId);
return listener == null ? null : listener.listeningQueue;
}

public synchronized int increaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) {
return id2ListenerMap
.computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
Expand All @@ -52,62 +58,65 @@
}

public synchronized int decreaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) {
return id2ListenerMap
.computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
.listeningQueueReferenceCount
.updateAndGet(v -> v > 0 ? v - 1 : 0);
final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId);
if (listener == null) {
return 0;
}

final int referenceCount =
listener.listeningQueueReferenceCount.updateAndGet(v -> v > 0 ? v - 1 : 0);
if (referenceCount == 0 && !listener.listeningQueue.isOpened()) {
cleanupListenerIfUnused(schemaRegionId, listener);
}
return referenceCount;
}

public synchronized void notifyLeaderReady(final SchemaRegionId schemaRegionId) {
id2ListenerMap
.computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
.notifyLeaderReady();
id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(true);
}

public synchronized void notifyLeaderUnavailable(final SchemaRegionId schemaRegionId) {
id2ListenerMap
.computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
.notifyLeaderUnavailable();
id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(false);
}

public synchronized boolean isLeaderReady(final SchemaRegionId schemaRegionId) {
return id2ListenerMap
.computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new)
.isLeaderReady();
final AtomicBoolean isLeaderReady = id2LeaderReadyMap.get(schemaRegionId);
return isLeaderReady != null && isLeaderReady.get();
}

public synchronized void cleanupListenerIfUnused(final SchemaRegionId schemaRegionId) {
final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId);
if (listener != null) {
cleanupListenerIfUnused(schemaRegionId, listener);
}
}

public synchronized void clearSchemaRegionState(final SchemaRegionId schemaRegionId) {
final PipeSchemaRegionListener listener = id2ListenerMap.remove(schemaRegionId);
if (listener != null) {
PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId());
}
id2LeaderReadyMap.remove(schemaRegionId);
}

private void cleanupListenerIfUnused(

Check warning on line 102 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '87'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4loJtz_9oVCvgy5GkA&open=AZ4loJtz_9oVCvgy5GkA&pullRequest=17666
final SchemaRegionId schemaRegionId, final PipeSchemaRegionListener listener) {
if (listener.listeningQueueReferenceCount.get() > 0 || listener.listeningQueue.isOpened()) {
return;
}
if (id2ListenerMap.remove(schemaRegionId, listener)) {
PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId());
}
}

private static class PipeSchemaRegionListener {

private final SchemaRegionListeningQueue listeningQueue = new SchemaRegionListeningQueue();
private final AtomicInteger listeningQueueReferenceCount = new AtomicInteger(0);

private final AtomicBoolean isLeaderReady = new AtomicBoolean(false);

protected PipeSchemaRegionListener(final SchemaRegionId schemaRegionId) {
PipeSchemaRegionListenerMetrics.getInstance()
.register(listeningQueue, schemaRegionId.getId());
}

/**
* Get leader ready state, DO NOT use consensus layer's leader ready flag because
* SimpleConsensus' ready flag is always {@code true}. Note that this flag has nothing to do
* with listening and a {@link PipeTask} starts only iff the current node is a leader and ready.
*
* @return {@code true} iff the current node is a leader and ready
*/
private boolean isLeaderReady() {
return isLeaderReady.get();
}

// Leader ready flag has the following effect
// 1. The linked list starts serving only after leader gets ready
// 2. Config pipe task is only created after leader gets ready
private void notifyLeaderReady() {
isLeaderReady.set(true);
}

private void notifyLeaderUnavailable() {
isLeaderReady.set(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
Expand All @@ -59,6 +60,7 @@
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter;
import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
Expand Down Expand Up @@ -267,10 +269,14 @@ private Set<Integer> clearSchemaRegionListeningQueueIfNecessary(
}

schemaRegionId2ListeningQueueNewFirstIndex.forEach(
(schemaRegionId, listeningQueueNewFirstIndex) ->
PipeDataNodeAgent.runtime()
.schemaListener(new SchemaRegionId(schemaRegionId))
.removeBefore(listeningQueueNewFirstIndex));
(schemaRegionId, listeningQueueNewFirstIndex) -> {
final SchemaRegionListeningQueue listeningQueue =
PipeDataNodeAgent.runtime()
.schemaListenerIfPresent(new SchemaRegionId(schemaRegionId));
if (listeningQueue != null) {
listeningQueue.removeBefore(listeningQueueNewFirstIndex);
}
});

return schemaRegionId2ListeningQueueNewFirstIndex.keySet();
}
Expand Down Expand Up @@ -323,6 +329,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) {
final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
PipeEventCommitManager.getInstance().clear(pipeName, creationTime);

return true;
}
Expand Down Expand Up @@ -351,6 +358,7 @@ protected boolean dropPipe(final String pipeName) {
final String taskId = pipeName + "_" + creationTime;
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
PipeEventCommitManager.getInstance().clear(pipeName, creationTime);
// When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the
// subscribed pipe, so the subscription needs to be manually marked as completed.
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
Expand Down
Loading
Loading