Skip to content

Commit 4aaa304

Browse files
committed
fix(lifecycle): debounce bucket processor scan-end to prevent false idle signals
The drain callback on _internalTaskScheduler fires every time the queue goes from non-empty to empty. Under light load this happens between Kafka message batches within the same conductor scan, prematurely resetting the scan start time gauge to 0 and setting _processorScanMetricsActive to false. Because subsequent messages carry the same conductorScanId, the new-scan branch is skipped and onBucketProcessorScanStart is never called again, leaving the Bucket Processor Scan Progress dashboard panel hidden for the remainder of the scan. Fix: replace the immediate reset with a 30-second debounced timeout. If new messages from the same scan arrive during the debounce window the timeout is cancelled, keeping the scan active. A new scan also cancels any pending debounce before performing a full metric reset. Issue: BB-740
1 parent 7a649cc commit 4aaa304

1 file changed

Lines changed: 36 additions & 7 deletions

File tree

extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ const locations = require('../../../conf/locationConfig.json');
3131

3232
const PROCESS_OBJECTS_ACTION = 'processObjects';
3333

34+
// Delay before declaring a scan finished after the internal task
35+
// queue drains. This prevents premature idle signals when the
36+
// queue empties between Kafka message batches within the same scan.
37+
const SCAN_END_DEBOUNCE_DELAY_MS = 30000;
38+
3439
/**
3540
* @class LifecycleBucketProcessor
3641
*
@@ -102,6 +107,7 @@ class LifecycleBucketProcessor {
102107
// Kafka batches from the same scan do not retrigger onBucketProcessorScanStart.
103108
this._metricsScanId = null;
104109
this._processorScanMetricsActive = false;
110+
this._scanEndTimeout = null;
105111

106112
this.clientManager = new ClientManager({
107113
id: 'lifecycle',
@@ -151,22 +157,31 @@ class LifecycleBucketProcessor {
151157
}, this._lcConfig.bucketProcessor.concurrency);
152158

153159
// Listen for errors from any task being processed.
154-
// When the internal task queue empties, also reset the
155-
// scan-in-progress gauge so Prometheus can detect that
156-
// this processor is idle.
160+
// When the internal task queue empties, schedule a delayed
161+
// reset of the scan-in-progress gauge. The delay avoids
162+
// false idle signals when the queue drains between Kafka
163+
// message batches that belong to the same conductor scan.
157164
this._internalTaskScheduler.drain(err => {
158165
if (err) {
159166
this._log.error('error occurred during task processing', {
160167
error: err,
161168
});
162169
}
163-
if (this._processorScanMetricsActive) {
164-
this._log.info('bucket processor internal task queue drained', {
170+
if (this._processorScanMetricsActive && !this._scanEndTimeout) {
171+
this._log.info('bucket processor task queue drained, scheduling scan end', {
165172
method: 'LifecycleBucketProcessor.drain',
166173
conductorScanId: this._metricsScanId,
174+
debounceMs: SCAN_END_DEBOUNCE_DELAY_MS,
167175
});
168-
LifecycleMetrics.onBucketProcessorScanEnd(this._log);
169-
this._processorScanMetricsActive = false;
176+
this._scanEndTimeout = setTimeout(() => {
177+
this._scanEndTimeout = null;
178+
this._log.info('bucket processor scan end confirmed', {
179+
method: 'LifecycleBucketProcessor.drain',
180+
conductorScanId: this._metricsScanId,
181+
});
182+
LifecycleMetrics.onBucketProcessorScanEnd(this._log);
183+
this._processorScanMetricsActive = false;
184+
}, SCAN_END_DEBOUNCE_DELAY_MS);
170185
}
171186
});
172187

@@ -296,6 +311,11 @@ class LifecycleBucketProcessor {
296311
const { bucket, owner, accountId, taskVersion } = result.target;
297312
const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId;
298313
if (conductorScanId && conductorScanId !== this._metricsScanId) {
314+
// New scan: cancel any pending debounce, full metric reset
315+
if (this._scanEndTimeout) {
316+
clearTimeout(this._scanEndTimeout);
317+
this._scanEndTimeout = null;
318+
}
299319
this._metricsScanId = conductorScanId;
300320
LifecycleMetrics.onBucketProcessorScanStart(
301321
this._log, Date.now()
@@ -306,6 +326,15 @@ class LifecycleBucketProcessor {
306326
'LifecycleBucketProcessor._processBucketEntry',
307327
conductorScanId,
308328
});
329+
} else if (conductorScanId && this._scanEndTimeout) {
330+
// Same scan, but the debounce was pending: cancel it
331+
// because more messages are still arriving.
332+
clearTimeout(this._scanEndTimeout);
333+
this._scanEndTimeout = null;
334+
this._log.debug('cancelled scan end debounce, more messages arriving', {
335+
method: 'LifecycleBucketProcessor._processBucketEntry',
336+
conductorScanId,
337+
});
309338
}
310339

311340
if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) {

0 commit comments

Comments
 (0)