Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,29 @@ public DisruptorQueue(
}

public void publish(final PipeRealtimeEvent event) {
publishOrDrop(event);
}

public boolean publishOrDrop(final PipeRealtimeEvent event) {
final EnrichedEvent innerEvent = event.getEvent();
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event);
mayPrintExceedingLog();
final boolean published =
ringBuffer.publishEvent(
(container, sequence, o) -> container.setEvent(event), event, this::isClosed);
if (published) {
mayPrintExceedingLog();
}
return published;
}

public void shutdown() {
public void closeInput() {
isClosed = true;
}

public void shutdown() {
closeInput();
// use shutdown instead of halt to ensure all published events have been handled
disruptor.shutdown();
allocatedMemoryBlock.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
private Boolean isTableModel;

private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();
private int inFlightPublishCount = 0;

public int getDataRegionId() {
return dataRegionId;
Expand Down Expand Up @@ -101,14 +102,28 @@
((PipeHeartbeatEvent) innerEvent).onPublished();
}

// use synchronized here for completely preventing reference count leaks under extreme thread
// scheduling when closing
synchronized (this) {
if (!disruptor.isClosed()) {
disruptor.publish(event);
} else {
if (disruptor.isClosed()) {
onAssignedHook(event);
return;
}
inFlightPublishCount++;
}

boolean isPublished = false;
try {
isPublished = disruptor.publishOrDrop(event);
} finally {
synchronized (this) {
inFlightPublishCount--;
if (inFlightPublishCount == 0) {
notifyAll();
}
}
}

if (!isPublished) {
onAssignedHook(event);
}
}

Expand Down Expand Up @@ -254,9 +269,25 @@
public synchronized void close() {
PipeAssignerMetrics.getInstance().deregister(dataRegionId);

boolean interrupted = false;
disruptor.closeInput();
while (inFlightPublishCount > 0) {
try {
wait();
} catch (final InterruptedException e) {

Check warning on line 277 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4muwOaMSN_QC76m6ma&open=AZ4muwOaMSN_QC76m6ma&pullRequest=17673
interrupted = true;
LOGGER.warn(
"Interrupted while waiting for in-flight publishes to finish when closing assigner on data region {}.",

Check warning on line 280 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 115).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4muwOaMSN_QC76m6mb&open=AZ4muwOaMSN_QC76m6mb&pullRequest=17673
dataRegionId);
}
}

final long startTime = System.currentTimeMillis();
disruptor.shutdown();
matcher.clear();
if (interrupted) {
Thread.currentThread().interrupt();
}
LOGGER.info(
"Pipe: Assigner on data region {} shutdown internal disruptor within {} ms",
dataRegionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@
// Batch process all available events
nextSequence = processAvailableEvents(nextSequence, availableSequence);

} catch (final InterruptedException ex) {

Check warning on line 82 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4muwHNMSN_QC76m6mY&open=AZ4muwHNMSN_QC76m6mY&pullRequest=17673
if (running) {
Thread.currentThread().interrupt();
LOGGER.info("Processor interrupted");
if (!running) {
break;
}
break;
// A transient interrupt should not permanently stop the consumer thread. Otherwise the
// gating sequence will stop advancing and producers may block forever on a full ring
// buffer, making the later close path appear stuck.
Thread.interrupted();
LOGGER.warn("Processor interrupted unexpectedly, continue running");
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence));
sequence.set(nextSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;

/**
* Multi-producer sequencer for coordinating concurrent publishers
Expand Down Expand Up @@ -110,14 +111,31 @@
* @return highest claimed sequence number
*/
public long next(int n) {
return next(n, () -> false);
}

/**
* Claim next n sequences for publishing, or abort if the caller is closing.
*
* @param n number of sequences to claim
* @param abortCondition returns {@code true} if the claim should be abandoned
* @return highest claimed sequence number, or {@link Sequence#INITIAL_VALUE} if aborted
*/
public long next(final int n, final BooleanSupplier abortCondition) {

Check failure on line 124 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4muwNsMSN_QC76m6mZ&open=AZ4muwNsMSN_QC76m6mZ&pullRequest=17673
if (n < 1) {
throw new IllegalArgumentException("n must be > 0");
}

final BooleanSupplier effectiveAbortCondition =
abortCondition != null ? abortCondition : () -> false;
long current;
long next;

do {
if (effectiveAbortCondition.getAsBoolean()) {
return Sequence.INITIAL_VALUE;
}

current = cursor.get();
next = current + n;

Expand All @@ -128,6 +146,9 @@
long gatingSequence = Sequence.getMinimumSequence(gatingSequences.get(), current);

if (wrapPoint > gatingSequence) {
if (effectiveAbortCondition.getAsBoolean()) {
return Sequence.INITIAL_VALUE;
}
LockSupport.parkNanos(1);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;

import java.util.function.BooleanSupplier;

/**
* Left-hand side padding for cache line alignment
*
Expand Down Expand Up @@ -205,8 +207,26 @@ public void publish(long lo, long hi) {
* @param <A> argument type
*/
public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) {
final long sequence = sequencer.next(1);
publishEvent(translator, arg0, () -> false);
}

/**
* Publish event using a translator function, or abort if the caller is closing.
*
* @param translator function to populate the event
* @param arg0 argument passed to translator
* @param abortCondition returns {@code true} if the publish should be abandoned
* @param <A> argument type
* @return {@code true} if the event is published, {@code false} if the publish is aborted
*/
public <A> boolean publishEvent(
final EventTranslator<E, A> translator, final A arg0, final BooleanSupplier abortCondition) {
final long sequence = sequencer.next(1, abortCondition);
if (sequence == Sequence.INITIAL_VALUE) {
return false;
}
translateAndPublish(translator, sequence, arg0);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -113,6 +114,61 @@ public void testDisruptorShutdownInterruptsWaitingProcessor() throws Exception {
Assert.assertFalse(processorThread.isAlive());
}

@Test
public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception {
final AtomicReference<Thread> processorThreadReference = new AtomicReference<>();
final ThreadFactory threadFactory =
runnable -> {
final Thread thread = new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test");
processorThreadReference.set(thread);
return thread;
};

final CountDownLatch handled = new CountDownLatch(1);
final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory);
final RingBuffer<TestEvent> ringBuffer =
disruptor.handleEventsWith((event, sequence, endOfBatch) -> handled.countDown()).start();

final Thread processorThread = processorThreadReference.get();
Assert.assertNotNull(processorThread);

TimeUnit.MILLISECONDS.sleep(50);
processorThread.interrupt();

ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);
Assert.assertTrue(handled.await(5, TimeUnit.SECONDS));
Assert.assertTrue(processorThread.isAlive());

disruptor.shutdown();
Assert.assertFalse(processorThread.isAlive());
}

@Test
public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws Exception {
final RingBuffer<TestEvent> ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 1);
final Sequence gatingSequence = new Sequence();
ringBuffer.addGatingSequences(gatingSequence);
ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1);

final AtomicBoolean isClosed = new AtomicBoolean(false);
final AtomicBoolean published = new AtomicBoolean(true);
final Thread publisherThread =
new Thread(
() ->
published.set(
ringBuffer.publishEvent(
(event, sequence, value) -> event.value = value, 2, isClosed::get)),
"pipe-disruptor-publish-abort-test");

publisherThread.start();
TimeUnit.MILLISECONDS.sleep(50);
isClosed.set(true);
publisherThread.join(TimeUnit.SECONDS.toMillis(5));

Assert.assertFalse(publisherThread.isAlive());
Assert.assertFalse(published.get());
}

private static class TestEvent {
private int value;
}
Expand Down
Loading