@@ -106,7 +106,6 @@ class LifecycleConductor {
106106 this . _vaultClientCache = null ;
107107 this . _initialized = false ;
108108 this . _batchInProgress = false ;
109- this . _scanId = null ;
110109
111110 // this cache only needs to be the size of one listing.
112111 // worst case scenario is 1 account per bucket:
@@ -310,13 +309,13 @@ class LifecycleConductor {
310309 } ) ;
311310 }
312311
313- _taskToMessage ( task , taskVersion , log ) {
312+ _taskToMessage ( task , taskVersion , scanId , log ) {
314313 return {
315314 message : JSON . stringify ( {
316315 action : 'processObjects' ,
317316 contextInfo : {
318317 reqId : log . getSerializedUids ( ) ,
319- conductorScanId : this . _scanId ,
318+ conductorScanId : scanId ,
320319 } ,
321320 target : {
322321 bucket : task . bucketName ,
@@ -357,28 +356,28 @@ class LifecycleConductor {
357356 cb ) ;
358357 }
359358
360- _createBucketTaskMessages ( tasks , log , cb ) {
359+ _createBucketTaskMessages ( tasks , scanId , log , cb ) {
361360 if ( this . lcConfig . forceLegacyListing ) {
362- return process . nextTick ( cb , null , tasks . map ( t => this . _taskToMessage ( t , lifecycleTaskVersions . v1 , log ) ) ) ;
361+ return process . nextTick ( cb , null , tasks . map ( t => this . _taskToMessage ( t , lifecycleTaskVersions . v1 , scanId , log ) ) ) ;
363362 }
364363
365364 return async . mapLimit ( tasks , 10 , ( t , taskDone ) =>
366365 this . _indexesGetOrCreate ( t , log , ( err , taskVersion ) => {
367366 if ( err ) {
368367 // should not happen as indexes methods would
369368 // ignore the errors and fallback to v1 listing
370- return taskDone ( null , this . _taskToMessage ( t , lifecycleTaskVersions . v1 , log ) ) ;
369+ return taskDone ( null , this . _taskToMessage ( t , lifecycleTaskVersions . v1 , scanId , log ) ) ;
371370 }
372- return taskDone ( null , this . _taskToMessage ( t , taskVersion , log ) ) ;
371+ return taskDone ( null , this . _taskToMessage ( t , taskVersion , scanId , log ) ) ;
373372 } ) , cb ) ;
374373 }
375374
376375 processBuckets ( cb ) {
377376 const log = this . logger . newRequestLogger ( ) ;
378377 const start = Date . now ( ) ;
379- this . _scanId = uuid ( ) ;
378+ const scanId = uuid ( ) ;
380379 if ( typeof log . addDefaultFields === 'function' ) {
381- log . addDefaultFields ( { conductorScanId : this . _scanId } ) ;
380+ log . addDefaultFields ( { conductorScanId : scanId } ) ;
382381 }
383382 LifecycleMetrics . onProcessBuckets ( log ) ;
384383 let nBucketsQueued = 0 ;
@@ -410,7 +409,7 @@ class LifecycleConductor {
410409 }
411410 return true ;
412411 } ) ) ) ,
413- ( tasksWithAccountId , next ) => this . _createBucketTaskMessages ( tasksWithAccountId , log , next ) ,
412+ ( tasksWithAccountId , next ) => this . _createBucketTaskMessages ( tasksWithAccountId , scanId , log , next ) ,
414413 ] ,
415414 ( err , messages ) => {
416415 nBucketsQueued += tasks . length ;
@@ -419,7 +418,7 @@ class LifecycleConductor {
419418 ) . length ;
420419
421420 log . info ( 'bucket push progress' , {
422- conductorScanId : this . _scanId ,
421+ conductorScanId : scanId ,
423422 nBucketsQueued,
424423 bucketsInCargo : tasks . length ,
425424 kafkaBucketMessagesDeliveryReports : messageDeliveryReports ,
@@ -445,9 +444,9 @@ class LifecycleConductor {
445444 this . _batchInProgress = true ;
446445 log . info ( 'starting new lifecycle batch' , {
447446 bucketSource : this . _bucketSource ,
448- conductorScanId : this . _scanId ,
447+ conductorScanId : scanId ,
449448 } ) ;
450- this . listBuckets ( messageSendQueue , log , ( err , listedBucketsCount ) => {
449+ this . listBuckets ( messageSendQueue , scanId , log , ( err , listedBucketsCount ) => {
451450 LifecycleMetrics . onBucketListing ( log , err ) ;
452451 nBucketsListed = listedBucketsCount ;
453452 return next ( err , listedBucketsCount ) ;
@@ -471,7 +470,7 @@ class LifecycleConductor {
471470 if ( err && err . Throttling ) {
472471 log . info ( 'not starting new lifecycle batch' , {
473472 reason : err ,
474- conductorScanId : this . _scanId ,
473+ conductorScanId : scanId ,
475474 fullScanElapsedMs : elapsedMs ,
476475 } ) ;
477476 if ( cb ) {
@@ -489,7 +488,7 @@ class LifecycleConductor {
489488 log . error ( 'lifecycle batch failed' , {
490489 error : err ,
491490 unknownCanonicalIds,
492- conductorScanId : this . _scanId ,
491+ conductorScanId : scanId ,
493492 fullScanElapsedMs : elapsedMs ,
494493 nBucketsListed,
495494 workflowsQueued : messageDeliveryReports ,
@@ -504,7 +503,7 @@ class LifecycleConductor {
504503 nBucketsQueued,
505504 nLifecycledBuckets,
506505 unknownCanonicalIds,
507- conductorScanId : this . _scanId ,
506+ conductorScanId : scanId ,
508507 fullScanElapsedMs : elapsedMs ,
509508 nBucketsListed,
510509 workflowsQueued : messageDeliveryReports ,
@@ -515,25 +514,25 @@ class LifecycleConductor {
515514 } ) ;
516515 }
517516
518- listBuckets ( queue , log , cb ) {
517+ listBuckets ( queue , scanId , log , cb ) {
519518 if ( this . _bucketSource === 'zookeeper' ) {
520- return this . listZookeeperBuckets ( queue , log , cb ) ;
519+ return this . listZookeeperBuckets ( queue , scanId , log , cb ) ;
521520 }
522521
523522 if ( this . _bucketSource === 'mongodb' ) {
524- return this . listMongodbBuckets ( queue , log , cb ) ;
523+ return this . listMongodbBuckets ( queue , scanId , log , cb ) ;
525524 }
526525
527526 return this . restoreBucketCheckpoint ( ( err , marker ) => {
528527 if ( err ) {
529528 return cb ( err ) ;
530529 }
531530
532- return this . listBucketdBuckets ( queue , marker || null , log , cb ) ;
531+ return this . listBucketdBuckets ( queue , marker || null , scanId , log , cb ) ;
533532 } ) ;
534533 }
535534
536- listZookeeperBuckets ( queue , log , cb ) {
535+ listZookeeperBuckets ( queue , scanId , log , cb ) {
537536 const zkBucketsPath = this . getBucketsZkPath ( ) ;
538537 this . _zkClient . getChildren (
539538 zkBucketsPath ,
@@ -616,7 +615,7 @@ class LifecycleConductor {
616615 } ) ;
617616 }
618617
619- listBucketdBuckets ( queue , initMarker , log , cb ) {
618+ listBucketdBuckets ( queue , initMarker , scanId , log , cb ) {
620619 let isTruncated = true ;
621620 let marker = initMarker ;
622621 let nEnqueued = 0 ;
@@ -635,7 +634,7 @@ class LifecycleConductor {
635634 maxInFlight : this . _maxInFlightBatchSize ,
636635 bucketListingPushRateHz : Math . round ( nEnqueued * 1000 / ( new Date ( ) - start ) ) ,
637636 breakerState,
638- conductorScanId : this . _scanId ,
637+ conductorScanId : scanId ,
639638 } ;
640639
641640 if ( queue . length ( ) > this . _maxInFlightBatchSize ||
@@ -717,7 +716,7 @@ class LifecycleConductor {
717716 ) ;
718717 }
719718
720- listMongodbBuckets ( queue , log , cb ) {
719+ listMongodbBuckets ( queue , scanId , log , cb ) {
721720 let nEnqueued = 0 ;
722721 const start = new Date ( ) ;
723722
@@ -791,7 +790,7 @@ class LifecycleConductor {
791790 maxInFlight : this . _maxInFlightBatchSize ,
792791 enqueueRateHz : Math . round ( nEnqueued * 1000 / ( new Date ( ) - start ) ) ,
793792 breakerState,
794- conductorScanId : this . _scanId ,
793+ conductorScanId : scanId ,
795794 } ;
796795
797796 if ( queue . length ( ) > this . _maxInFlightBatchSize ||
0 commit comments