Skip to content

Commit 5c0fec2

Browse files
doleyzidoleyzi
andauthored
[INLONG-12064][Audit] Static ScheduledExecutorService in PulsarSink causes ClassLoader leaks and shared state corruption (#12085)
Co-authored-by: doleyzi <doleyzi@tencent.com>
1 parent 5b50d13 commit 5c0fec2

1 file changed

Lines changed: 54 additions & 14 deletions

File tree

  • inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink

inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.concurrent.LinkedBlockingQueue;
4848
import java.util.concurrent.ScheduledExecutorService;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.concurrent.atomic.AtomicInteger;
5052
import java.util.concurrent.atomic.AtomicLong;
5153

5254
/**
@@ -145,22 +147,16 @@ public class PulsarSink extends AbstractSink
145147

146148
private static final PulsarPerformanceTask pulsarPerformanceTask = new PulsarPerformanceTask();
147149

148-
private static ScheduledExecutorService scheduledExecutorService = Executors
149-
.newScheduledThreadPool(1, new HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
150+
private static volatile ScheduledExecutorService scheduledExecutorService;
151+
152+
private static final AtomicBoolean schedulerStarted = new AtomicBoolean(false);
153+
154+
private static final AtomicInteger activeInstances = new AtomicInteger(0);
150155

151156
private String topic;
152157

153158
private Context context;
154159

155-
static {
156-
/*
157-
* stat pulsar performance
158-
*/
159-
logger.info("init pulsarPerformanceTask");
160-
scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 0L,
161-
PRINT_INTERVAL, TimeUnit.SECONDS);
162-
}
163-
164160
public PulsarSink() {
165161
super();
166162
logger.debug("new instance of PulsarSink!");
@@ -199,6 +195,46 @@ public void configure(Context context) {
199195
}
200196
}
201197

198+
/**
199+
* Start the performance scheduler if not already started.
200+
* Uses compareAndSet to ensure the scheduler is only initialized once across all instances.
201+
*/
202+
private static synchronized void startPerformanceScheduler() {
203+
if (!schedulerStarted.compareAndSet(false, true)) {
204+
return;
205+
}
206+
scheduledExecutorService = Executors.newScheduledThreadPool(1,
207+
new HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
208+
scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, PRINT_INTERVAL,
209+
PRINT_INTERVAL, TimeUnit.SECONDS);
210+
logger.info("PulsarPerformanceTask scheduler started");
211+
}
212+
213+
/**
214+
* Stop the performance scheduler when the last active instance is stopped.
215+
* Uses reference counting to ensure the scheduler is only shut down
216+
* when no more active instances remain.
217+
*/
218+
private static synchronized void stopPerformanceScheduler() {
219+
if (activeInstances.get() <= 0) {
220+
logger.warn("No active instances to stop, skipping scheduler shutdown");
221+
return;
222+
}
223+
224+
if (activeInstances.decrementAndGet() > 0) {
225+
logger.info("Still have active instances, not shutting down scheduler");
226+
return;
227+
}
228+
229+
if (scheduledExecutorService != null && !scheduledExecutorService.isShutdown()) {
230+
logger.info("Shutting down pulsarPerformanceTask scheduler");
231+
scheduledExecutorService.shutdownNow();
232+
scheduledExecutorService = null;
233+
}
234+
schedulerStarted.set(false);
235+
logger.info("PulsarPerformanceTask scheduler stopped");
236+
}
237+
202238
private void initTopic() throws Exception {
203239
long startTime = System.currentTimeMillis();
204240
if (topic != null) {
@@ -230,6 +266,10 @@ public void start() {
230266
+ i);
231267
sinkThreadPool[i].start();
232268
}
269+
270+
activeInstances.incrementAndGet();
271+
startPerformanceScheduler();
272+
233273
logger.debug("meta sink started");
234274
}
235275

@@ -258,9 +298,9 @@ public void stop() {
258298
sinkThreadPool = null;
259299
}
260300
super.stop();
261-
if (!scheduledExecutorService.isShutdown()) {
262-
scheduledExecutorService.shutdown();
263-
}
301+
302+
stopPerformanceScheduler();
303+
264304
sinkCounter.stop();
265305
logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
266306
}

0 commit comments

Comments
 (0)