@@ -98,8 +98,10 @@ class LifecycleBucketProcessor {
9898 this . _pausedLocations = new Set ( ) ;
9999 this . _locationStatusStream = null ;
100100
101- // scan tracking for metrics
102- this . _currentScanId = null ;
101+ // Conductor scan ID for metrics; kept across internal queue drains so
102+ // Kafka batches from the same scan do not retrigger onBucketProcessorScanStart.
103+ this . _metricsScanId = null ;
104+ this . _processorScanMetricsActive = false ;
103105
104106 this . clientManager = new ClientManager ( {
105107 id : 'lifecycle' ,
@@ -158,13 +160,13 @@ class LifecycleBucketProcessor {
158160 error : err ,
159161 } ) ;
160162 }
161- if ( this . _currentScanId ) {
162- this . _log . info ( 'bucket processor scan complete ' , {
163+ if ( this . _processorScanMetricsActive ) {
164+ this . _log . info ( 'bucket processor internal task queue drained ' , {
163165 method : 'LifecycleBucketProcessor.drain' ,
164- conductorScanId : this . _currentScanId ,
166+ conductorScanId : this . _metricsScanId ,
165167 } ) ;
166168 LifecycleMetrics . onBucketProcessorScanEnd ( this . _log ) ;
167- this . _currentScanId = null ;
169+ this . _processorScanMetricsActive = false ;
168170 }
169171 } ) ;
170172
@@ -293,11 +295,12 @@ class LifecycleBucketProcessor {
293295 }
294296 const { bucket, owner, accountId, taskVersion } = result . target ;
295297 const conductorScanId = result . contextInfo && result . contextInfo . conductorScanId ;
296- if ( conductorScanId && conductorScanId !== this . _currentScanId ) {
297- this . _currentScanId = conductorScanId ;
298+ if ( conductorScanId && conductorScanId !== this . _metricsScanId ) {
299+ this . _metricsScanId = conductorScanId ;
298300 LifecycleMetrics . onBucketProcessorScanStart (
299301 this . _log , Date . now ( )
300302 ) ;
303+ this . _processorScanMetricsActive = true ;
301304 this . _log . info ( 'new conductor scan detected' , {
302305 method :
303306 'LifecycleBucketProcessor._processBucketEntry' ,
0 commit comments