diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9c356e8654bf..17fb59bc2ffd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -66,6 +66,9 @@ public class PipeDataRegionAssigner implements Closeable { private Boolean isTableModel; + private int listenToTsFileSourceCount = 0; + private int listenToInsertNodeSourceCount = 0; + private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); public int getDataRegionId() { @@ -228,12 +231,34 @@ private void assignToSource( }); } - public void startAssignTo(final PipeRealtimeDataRegionSource source) { + public synchronized void startAssignTo(final PipeRealtimeDataRegionSource source) { matcher.register(source); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount++; + } + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount++; + } + logSourceAssignmentChange("registered", source); } - public void stopAssignTo(final PipeRealtimeDataRegionSource source) { + public synchronized void stopAssignTo(final PipeRealtimeDataRegionSource source) { matcher.deregister(source); + if (source.isNeedListenToTsFile()) { + listenToTsFileSourceCount--; + } + if (source.isNeedListenToInsertNode()) { + listenToInsertNodeSourceCount--; + } + logSourceAssignmentChange("deregistered", source); + } + + public synchronized boolean shouldListenToTsFile() { + return listenToTsFileSourceCount > 0; + } + + public synchronized boolean shouldListenToInsertNode() { + return listenToInsertNodeSourceCount > 0; } public void invalidateCache() { @@ -275,6 +300,21 @@ public int getPipeHeartbeatEventCount() { return eventCounter.getPipeHeartbeatEventCount(); } + private void logSourceAssignmentChange( + final String action, final PipeRealtimeDataRegionSource source) { + LOGGER.info( + "Pipe {}@{} {} realtime source on data region {} (listenToTsFile={}, listenToInsertNode={}, registeredSourceCount={}, tsFileSourceCount={}, insertNodeSourceCount={}).", + source.getPipeName(), + source.getCreationTime(), + action, + dataRegionId, + source.isNeedListenToTsFile(), + source.isNeedListenToInsertNode(), + matcher.getRegisterCount(), + listenToTsFileSourceCount, + listenToInsertNodeSourceCount); + } + public Boolean isTableModel() { return isTableModel; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 157fb0078e32..4472295a5abf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -34,7 +34,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; /** * PipeInsertionEventListener is a singleton in each data node. @@ -51,23 +50,18 @@ public class PipeInsertionDataNodeListener { private final ConcurrentMap dataRegionId2Assigner = new ConcurrentHashMap<>(); - private final AtomicInteger listenToTsFileSourceCount = new AtomicInteger(0); - private final AtomicInteger listenToInsertNodeSourceCount = new AtomicInteger(0); - //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( final int dataRegionId, final PipeRealtimeDataRegionSource source) { - dataRegionId2Assigner - .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) - .startAssignTo(source); - - if (source.isNeedListenToTsFile()) { - listenToTsFileSourceCount.incrementAndGet(); - } - if (source.isNeedListenToInsertNode()) { - listenToInsertNodeSourceCount.incrementAndGet(); - } + dataRegionId2Assigner.compute( + dataRegionId, + (id, assigner) -> { + final PipeDataRegionAssigner actualAssigner = + assigner == null ? new PipeDataRegionAssigner(dataRegionId) : assigner; + actualAssigner.startAssignTo(source); + return actualAssigner; + }); } public synchronized void stopListenAndAssign( @@ -82,13 +76,6 @@ public synchronized void stopListenAndAssign( assigner.stopAssignTo(source); - if (source.isNeedListenToTsFile()) { - listenToTsFileSourceCount.decrementAndGet(); - } - if (source.isNeedListenToInsertNode()) { - listenToInsertNodeSourceCount.decrementAndGet(); - } - if (assigner.notMoreSourceNeededToBeAssigned()) { // The removed assigner will is the same as the one referenced by the variable `assigner` dataRegionId2Assigner.remove(dataRegionId); @@ -110,14 +97,10 @@ public void listenToTsFile( final String databaseName, final TsFileResource tsFileResource, final boolean isLoaded) { - // We don't judge whether listenToTsFileSourceCount.get() == 0 here on purpose - // because spirces may use tsfile events when some exceptions occur in the - // insert nodes listening process. - final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be extracted - if (assigner == null) { + // only events from registered data region with tsfile listeners will be extracted + if (assigner == null || !assigner.shouldListenToTsFile()) { return; } @@ -131,14 +114,10 @@ public void listenToInsertNode( final String databaseName, final InsertNode insertNode, final TsFileResource tsFileResource) { - if (listenToInsertNodeSourceCount.get() == 0) { - return; - } - final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); - // only events from registered data region will be extracted - if (assigner == null) { + // only events from registered data region with insert listeners will be extracted + if (assigner == null || !assigner.shouldListenToInsertNode()) { return; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java index d3adbf9aaade..b793e8ca6991 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionHybridSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionLogSource; import org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource; @@ -40,6 +41,7 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; @@ -64,6 +66,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; public class PipeRealtimeExtractTest { @@ -261,6 +264,52 @@ public void testRealtimeExtractProcess() { } } + @Test + public void testListenToTsFileSkipsAssignerWithoutTsFileSource() throws Exception { + try (final NoTsFileRealtimeDataRegionSource extractor = + new NoTsFileRealtimeDataRegionSource()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.EXTRACTOR_PATTERN_KEY, pattern1); + } + }); + final PipeTaskRuntimeConfiguration configuration = + new PipeTaskRuntimeConfiguration( + new PipeTaskSourceRuntimeEnvironment( + "1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + + extractor.validate(new PipeParameterValidator(parameters)); + extractor.customize(parameters, configuration); + extractor.start(); + + final File dataRegionDir = + new File(tsFileDir.getPath() + File.separator + dataRegion1 + File.separator + "0"); + final boolean ignored = dataRegionDir.mkdirs(); + final File tsFile = new File(dataRegionDir, "0-0-0-0.tsfile"); + Assert.assertTrue(tsFile.createNewFile()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.updateStartTime( + IDeviceID.Factory.DEFAULT_FACTORY.create( + String.join(TsFileConstant.PATH_SEPARATOR, device)), + 0); + resource.close(); + + PipeInsertionDataNodeListener.getInstance() + .listenToTsFile(dataRegion1, Integer.toString(dataRegion1), resource, false); + + final long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1); + while (System.currentTimeMillis() < deadline + && extractor.getObservedTsFileEventCount() == 0) { + TimeUnit.MILLISECONDS.sleep(10); + } + + Assert.assertEquals(0, extractor.getObservedTsFileEventCount()); + } + } + private Future write2DataRegion( final int writeNum, final int dataRegionId, final int startNum) { final File dataRegionDir = @@ -348,4 +397,36 @@ private Future listen( } }); } + + private static class NoTsFileRealtimeDataRegionSource extends PipeRealtimeDataRegionSource { + + private final AtomicInteger observedTsFileEventCount = new AtomicInteger(0); + + @Override + public Event supply() { + return null; + } + + @Override + protected void doExtract(final PipeRealtimeEvent event) { + if (event.getEvent() instanceof TsFileInsertionEvent) { + observedTsFileEventCount.incrementAndGet(); + } + event.decreaseReferenceCount(NoTsFileRealtimeDataRegionSource.class.getName(), false); + } + + @Override + public boolean isNeedListenToTsFile() { + return false; + } + + @Override + public boolean isNeedListenToInsertNode() { + return false; + } + + private int getObservedTsFileEventCount() { + return observedTsFileEventCount.get(); + } + } }