From ca9a8a7613569a421abef8ca7e5d0948f5fc0fba Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 21:13:37 +0800 Subject: [PATCH 1/2] Fix interrupt --- .../realtime/assigner/DisruptorQueue.java | 19 ++++++- .../assigner/PipeDataRegionAssigner.java | 41 +++++++++++-- .../disruptor/BatchEventProcessor.java | 11 ++-- .../disruptor/MultiProducerSequencer.java | 21 +++++++ .../realtime/disruptor/RingBuffer.java | 24 +++++++- .../disruptor/DisruptorShutdownTest.java | 57 +++++++++++++++++++ 6 files changed, 160 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 2019eba85603..d3fc92a677a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -87,16 +87,29 @@ public DisruptorQueue( } public void publish(final PipeRealtimeEvent event) { + publishOrDrop(event); + } + + public boolean publishOrDrop(final PipeRealtimeEvent event) { final EnrichedEvent innerEvent = event.getEvent(); if (innerEvent instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer); } - ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event); - mayPrintExceedingLog(); + final boolean published = + ringBuffer.publishEvent( + (container, sequence, o) -> container.setEvent(event), event, this::isClosed); + if (published) { + mayPrintExceedingLog(); + } + return published; } - public void shutdown() { + public void closeInput() { isClosed = true; + } + + public void shutdown() { + closeInput(); // use shutdown instead of halt to ensure all published events have been handled disruptor.shutdown(); allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9c356e8654bf..c9e65bc59c8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -67,6 +67,7 @@ public class PipeDataRegionAssigner implements Closeable { private Boolean isTableModel; private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); + private int inFlightPublishCount = 0; public int getDataRegionId() { return dataRegionId; @@ -101,14 +102,28 @@ public void publishToAssign(final PipeRealtimeEvent event) { ((PipeHeartbeatEvent) innerEvent).onPublished(); } - // use synchronized here for completely preventing reference count leaks under extreme thread - // scheduling when closing synchronized (this) { - if (!disruptor.isClosed()) { - disruptor.publish(event); - } else { + if (disruptor.isClosed()) { onAssignedHook(event); + return; } + inFlightPublishCount++; + } + + boolean isPublished = false; + try { + isPublished = disruptor.publishOrDrop(event); + } finally { + synchronized (this) { + inFlightPublishCount--; + if (inFlightPublishCount == 0) { + notifyAll(); + } + } + } + + if (!isPublished) { + onAssignedHook(event); } } @@ -254,9 +269,25 @@ public boolean notMoreSourceNeededToBeAssigned() { public synchronized void close() { PipeAssignerMetrics.getInstance().deregister(dataRegionId); + boolean interrupted = false; + disruptor.closeInput(); + while (inFlightPublishCount > 0) { + try { + wait(); + } catch (final InterruptedException e) { + interrupted = true; + LOGGER.warn( + "Interrupted while waiting for in-flight publishes to finish when closing assigner on data region {}.", + dataRegionId); + } + } + final long startTime = System.currentTimeMillis(); disruptor.shutdown(); matcher.clear(); + if (interrupted) { + Thread.currentThread().interrupt(); + } LOGGER.info( "Pipe: Assigner on data region {} shutdown internal disruptor within {} ms", dataRegionId, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java index d0432821cf77..6a3b2cc62fd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -80,11 +80,14 @@ public void run() { nextSequence = processAvailableEvents(nextSequence, availableSequence); } catch (final InterruptedException ex) { - if (running) { - Thread.currentThread().interrupt(); - LOGGER.info("Processor interrupted"); + if (!running) { + break; } - break; + // A transient interrupt should not permanently stop the consumer thread. Otherwise the + // gating sequence will stop advancing and producers may block forever on a full ring + // buffer, making the later close path appear stuck. + Thread.interrupted(); + LOGGER.warn("Processor interrupted unexpectedly, continue running"); } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence)); sequence.set(nextSequence); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java index 9aa7716a7a54..457593ca979e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.function.BooleanSupplier; /** * Multi-producer sequencer for coordinating concurrent publishers @@ -110,14 +111,31 @@ public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) { * @return highest claimed sequence number */ public long next(int n) { + return next(n, () -> false); + } + + /** + * Claim next n sequences for publishing, or abort if the caller is closing. + * + * @param n number of sequences to claim + * @param abortCondition returns {@code true} if the claim should be abandoned + * @return highest claimed sequence number, or {@link Sequence#INITIAL_VALUE} if aborted + */ + public long next(final int n, final BooleanSupplier abortCondition) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } + final BooleanSupplier effectiveAbortCondition = + abortCondition != null ? abortCondition : () -> false; long current; long next; do { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } + current = cursor.get(); next = current + n; @@ -128,6 +146,9 @@ public long next(int n) { long gatingSequence = Sequence.getMinimumSequence(gatingSequences.get(), current); if (wrapPoint > gatingSequence) { + if (effectiveAbortCondition.getAsBoolean()) { + return Sequence.INITIAL_VALUE; + } LockSupport.parkNanos(1); continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java index 2af784b603d6..0b9e98400fb5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; +import java.util.function.BooleanSupplier; + /** * Left-hand side padding for cache line alignment * @@ -205,8 +207,28 @@ public void publish(long lo, long hi) { * @param argument type */ public void publishEvent(EventTranslator translator, A arg0) { - final long sequence = sequencer.next(1); + publishEvent(translator, arg0, () -> false); + } + + /** + * Publish event using a translator function, or abort if the caller is closing. + * + * @param translator function to populate the event + * @param arg0 argument passed to translator + * @param abortCondition returns {@code true} if the publish should be abandoned + * @param argument type + * @return {@code true} if the event is published, {@code false} if the publish is aborted + */ + public boolean publishEvent( + final EventTranslator translator, + final A arg0, + final BooleanSupplier abortCondition) { + final long sequence = sequencer.next(1, abortCondition); + if (sequence == Sequence.INITIAL_VALUE) { + return false; + } translateAndPublish(translator, sequence, arg0); + return true; } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java index 3fd40c4d4f2b..b2bebde2010e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -113,6 +114,62 @@ public void testDisruptorShutdownInterruptsWaitingProcessor() throws Exception { Assert.assertFalse(processorThread.isAlive()); } + @Test + public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception { + final AtomicReference processorThreadReference = new AtomicReference<>(); + final ThreadFactory threadFactory = + runnable -> { + final Thread thread = + new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); + processorThreadReference.set(thread); + return thread; + }; + + final CountDownLatch handled = new CountDownLatch(1); + final Disruptor disruptor = new Disruptor<>(TestEvent::new, 32, threadFactory); + final RingBuffer ringBuffer = + disruptor.handleEventsWith((event, sequence, endOfBatch) -> handled.countDown()).start(); + + final Thread processorThread = processorThreadReference.get(); + Assert.assertNotNull(processorThread); + + TimeUnit.MILLISECONDS.sleep(50); + processorThread.interrupt(); + + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + Assert.assertTrue(handled.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(processorThread.isAlive()); + + disruptor.shutdown(); + Assert.assertFalse(processorThread.isAlive()); + } + + @Test + public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws Exception { + final RingBuffer ringBuffer = RingBuffer.createMultiProducer(TestEvent::new, 1); + final Sequence gatingSequence = new Sequence(); + ringBuffer.addGatingSequences(gatingSequence); + ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 1); + + final AtomicBoolean isClosed = new AtomicBoolean(false); + final AtomicBoolean published = new AtomicBoolean(true); + final Thread publisherThread = + new Thread( + () -> + published.set( + ringBuffer.publishEvent( + (event, sequence, value) -> event.value = value, 2, isClosed::get)), + "pipe-disruptor-publish-abort-test"); + + publisherThread.start(); + TimeUnit.MILLISECONDS.sleep(50); + isClosed.set(true); + publisherThread.join(TimeUnit.SECONDS.toMillis(5)); + + Assert.assertFalse(publisherThread.isAlive()); + Assert.assertFalse(published.get()); + } + private static class TestEvent { private int value; } From 696a5f65df249e08d5e2f4a80c8659e60469f034 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 14 May 2026 21:36:43 +0800 Subject: [PATCH 2/2] spotless --- .../pipe/source/dataregion/realtime/disruptor/RingBuffer.java | 4 +--- .../dataregion/realtime/disruptor/DisruptorShutdownTest.java | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java index 0b9e98400fb5..25f7fe7d1b91 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java @@ -220,9 +220,7 @@ public void publishEvent(EventTranslator translator, A arg0) { * @return {@code true} if the event is published, {@code false} if the publish is aborted */ public boolean publishEvent( - final EventTranslator translator, - final A arg0, - final BooleanSupplier abortCondition) { + final EventTranslator translator, final A arg0, final BooleanSupplier abortCondition) { final long sequence = sequencer.next(1, abortCondition); if (sequence == Sequence.INITIAL_VALUE) { return false; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java index b2bebde2010e..ef57ea625aef 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java @@ -119,8 +119,7 @@ public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception { final AtomicReference processorThreadReference = new AtomicReference<>(); final ThreadFactory threadFactory = runnable -> { - final Thread thread = - new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); + final Thread thread = new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test"); processorThreadReference.set(thread); return thread; };