Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@

private Boolean isTableModel;

private int listenToTsFileSourceCount = 0;
private int listenToInsertNodeSourceCount = 0;

private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();

public int getDataRegionId() {
Expand Down Expand Up @@ -228,12 +231,34 @@
});
}

public void startAssignTo(final PipeRealtimeDataRegionSource source) {
public synchronized void startAssignTo(final PipeRealtimeDataRegionSource source) {
matcher.register(source);
if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount++;
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount++;
}
logSourceAssignmentChange("registered", source);
}

public void stopAssignTo(final PipeRealtimeDataRegionSource source) {
public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource source) {
matcher.deregister(source);
if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount--;
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount--;
}
logSourceAssignmentChange("deregistered", source);
}

public synchronized boolean shouldListenToTsFile() {
return listenToTsFileSourceCount > 0;
}

public synchronized boolean shouldListenToInsertNode() {
return listenToInsertNodeSourceCount > 0;
}
Comment on lines +256 to 262
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary to synchronize?


public void invalidateCache() {
Expand Down Expand Up @@ -275,6 +300,21 @@
return eventCounter.getPipeHeartbeatEventCount();
}

private void logSourceAssignmentChange(
final String action, final PipeRealtimeDataRegionSource source) {
LOGGER.info(
"Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, insertNodeSourceCount={}).",

Check warning on line 306 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 176).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4nIusLrXVVnvSc7HQQ&open=AZ4nIusLrXVVnvSc7HQQ&pullRequest=17669
source.getPipeName(),
source.getCreationTime(),
action,
dataRegionId,
source.isNeedListenToTsFile(),
source.isNeedListenToInsertNode(),
matcher.getRegisterCount(),
listenToTsFileSourceCount,
listenToInsertNodeSourceCount);
}

public Boolean isTableModel() {
return isTableModel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* PipeInsertionEventListener is a singleton in each data node.
Expand All @@ -51,23 +50,18 @@ public class PipeInsertionDataNodeListener {
private final ConcurrentMap<Integer, PipeDataRegionAssigner> dataRegionId2Assigner =
new ConcurrentHashMap<>();

private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0);
private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0);

//////////////////////////// start & stop ////////////////////////////

public synchronized void startListenAndAssign(
final int dataRegionId, final PipeRealtimeDataRegionSource source) {
dataRegionId2Assigner
.computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId))
.startAssignTo(source);

if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.incrementAndGet();
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.incrementAndGet();
}
dataRegionId2Assigner.compute(
dataRegionId,
(id, assigner) -> {
final PipeDataRegionAssigner actualAssigner =
assigner == null ? new PipeDataRegionAssigner(dataRegionId) : assigner;
actualAssigner.startAssignTo(source);
return actualAssigner;
});
}
Comment on lines +57 to 65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ComputeIfAbsent?


public synchronized void stopListenAndAssign(
Expand All @@ -82,13 +76,6 @@ public synchronized void stopListenAndAssign(

assigner.stopAssignTo(source);

if (source.isNeedListenToTsFile()) {
listenToTsFileSourceCount.decrementAndGet();
}
if (source.isNeedListenToInsertNode()) {
listenToInsertNodeSourceCount.decrementAndGet();
}

if (assigner.notMoreSourceNeededToBeAssigned()) {
// The removed assigner will is the same as the one referenced by the variable `assigner`
dataRegionId2Assigner.remove(dataRegionId);
Expand All @@ -110,14 +97,10 @@ public void listenToTsFile(
final String databaseName,
final TsFileResource tsFileResource,
final boolean isLoaded) {
// We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose
// because spirces may use tsfile events when some exceptions occur in the
// insert nodes listening process.

final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);

// only events from registered data region will be extracted
if (assigner == null) {
// only events from registered data region with tsfile listeners will be extracted
if (assigner == null || !assigner.shouldListenToTsFile()) {
return;
}

Expand All @@ -131,14 +114,10 @@ public void listenToInsertNode(
final String databaseName,
final InsertNode insertNode,
final TsFileResource tsFileResource) {
if (listenToInsertNodeSourceCount.get() == 0) {
return;
}

final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);

// only events from registered data region will be extracted
if (assigner == null) {
// only events from registered data region with insert listeners will be extracted
if (assigner == null || !assigner.shouldListenToInsertNode()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
Expand All @@ -40,6 +41,7 @@
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;

import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
Expand All @@ -64,6 +66,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class PipeRealtimeExtractTest {
Expand Down Expand Up @@ -261,6 +264,52 @@ public void testRealtimeExtractProcess() {
}
}

@Test
public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exception {
try (final NoTsFileRealtimeDataRegionSource extractor =
new NoTsFileRealtimeDataRegionSource()) {
final PipeParameters parameters =
new PipeParameters(
new HashMap<String, String>() {
{
put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1);
}
});
final PipeTaskRuntimeConfiguration configuration =
new PipeTaskRuntimeConfiguration(
new PipeTaskSourceRuntimeEnvironment(
"1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)));

extractor.validate(new PipeParameterValidator(parameters));
extractor.customize(parameters, configuration);
extractor.start();

final File dataRegionDir =
new File(tsFileDir.getPath() + File.separator + dataRegion1 + File.separator + "0");
final boolean ignored = dataRegionDir.mkdirs();
final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile");
Assert.assertTrue(tsFile.createNewFile());

final TsFileResource resource = new TsFileResource(tsFile);
resource.updateStartTime(
IDeviceID.Factory.DEFAULT_FACTORY.create(
String.join(TsFileConstant.PATH_SEPARATOR, device)),
0);
resource.close();

PipeInsertionDataNodeListener.getInstance()
.listenToTsFile(dataRegion1, Integer.toString(dataRegion1), resource, false);

final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1);
while (System.currentTimeMillis() < deadline
&& extractor.getObservedTsFileEventCount() == 0) {
TimeUnit.MILLISECONDS.sleep(10);
}

Assert.assertEquals(0, extractor.getObservedTsFileEventCount());
}
}

private Future<?> write2DataRegion(
final int writeNum, final int dataRegionId, final int startNum) {
final File dataRegionDir =
Expand Down Expand Up @@ -348,4 +397,36 @@ private Future<?> listen(
}
});
}

private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource {

private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0);

@Override
public Event supply() {
return null;
}

@Override
protected void doExtract(final PipeRealtimeEvent event) {
if (event.getEvent() instanceof TsFileInsertionEvent) {
observedTsFileEventCount.incrementAndGet();
}
event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), false);
}

@Override
public boolean isNeedListenToTsFile() {
return false;
}

@Override
public boolean isNeedListenToInsertNode() {
return false;
}

private int getObservedTsFileEventCount() {
return observedTsFileEventCount.get();
}
}
}
Loading