@@ -304,91 +304,103 @@ private void runSynchronizers(
304304 @ NonNull LDContext context ,
305305 @ NonNull DataSourceUpdateSinkV2 sink
306306 ) {
307- Synchronizer synchronizer = sourceManager .getNextAvailableSynchronizerAndSetActive ();
308- while (synchronizer != null ) {
309- if (stopped .get ()) {
310- return ;
311- }
312- int synchronizerCount = sourceManager .getAvailableSynchronizerCount ();
313- boolean isPrime = sourceManager .isPrimeSynchronizer ();
314- try {
315- boolean running = true ;
316- try (FDv2DataSourceConditions .Conditions conditions =
317- new FDv2DataSourceConditions .Conditions (getConditions (synchronizerCount , isPrime ))) {
318- while (running ) {
319- Future <FDv2SourceResult > nextFuture = synchronizer .next ();
320- Object res = LDFutures .anyOf (conditions .getFuture (), nextFuture ).get ();
307+ try {
308+ Synchronizer synchronizer = sourceManager .getNextAvailableSynchronizerAndSetActive ();
309+ while (synchronizer != null ) {
310+ if (stopped .get ()) {
311+ return ;
312+ }
313+ int synchronizerCount = sourceManager .getAvailableSynchronizerCount ();
314+ boolean isPrime = sourceManager .isPrimeSynchronizer ();
315+ try {
316+ boolean running = true ;
317+ try (FDv2DataSourceConditions .Conditions conditions =
318+ new FDv2DataSourceConditions .Conditions (getConditions (synchronizerCount , isPrime ))) {
319+ while (running ) {
320+ Future <FDv2SourceResult > nextFuture = synchronizer .next ();
321+ // Race the next synchronizer result against any active conditions
322+ // (fallback/recovery timers). Whichever resolves first wins.
323+ Object res = LDFutures .anyOf (conditions .getFuture (), nextFuture ).get ();
321324
322- if (res instanceof FDv2DataSourceConditions .ConditionType ) {
323- FDv2DataSourceConditions .ConditionType ct = (FDv2DataSourceConditions .ConditionType ) res ;
324- switch (ct ) {
325- case FALLBACK :
326- logger .debug ("Synchronizer {} experienced an interruption; falling back to next synchronizer." ,
327- synchronizer .getClass ().getSimpleName ());
328- break ;
329- case RECOVERY :
330- logger .debug ("The data source is attempting to recover to a higher priority synchronizer." );
331- sourceManager .resetSourceIndex ();
332- break ;
325+ if (res instanceof FDv2DataSourceConditions .ConditionType ) {
326+ FDv2DataSourceConditions .ConditionType ct = (FDv2DataSourceConditions .ConditionType ) res ;
327+ switch (ct ) {
328+ case FALLBACK :
329+ logger .debug ("Synchronizer {} experienced an interruption; falling back to next synchronizer." ,
330+ synchronizer .getClass ().getSimpleName ());
331+ break ;
332+ case RECOVERY :
333+ logger .debug ("The data source is attempting to recover to a higher priority synchronizer." );
334+ sourceManager .resetSourceIndex ();
335+ break ;
336+ }
337+ running = false ;
338+ break ;
333339 }
334- running = false ;
335- break ;
336- }
337340
338- if (!(res instanceof FDv2SourceResult )) {
339- logger .error ("Unexpected result type from synchronizer: {}" , res != null ? res .getClass ().getName () : "null" );
340- continue ;
341- }
341+ if (!(res instanceof FDv2SourceResult )) {
342+ logger .error ("Unexpected result type from synchronizer: {}" , res != null ? res .getClass ().getName () : "null" );
343+ continue ;
344+ }
342345
343- FDv2SourceResult result = (FDv2SourceResult ) res ;
344- conditions .inform (result );
346+ FDv2SourceResult result = (FDv2SourceResult ) res ;
347+ // Let conditions observe the result before we act on it so
348+ // they can update their internal state (e.g. reset interruption timers).
349+ conditions .inform (result );
345350
346- switch (result .getResultType ()) {
347- case CHANGE_SET :
348- ChangeSet <Map <String , DataModel .Flag >> changeSet = result .getChangeSet ();
349- if (changeSet != null ) {
350- sink .apply (context , changeSet );
351- sink .setStatus (DataSourceState .VALID , null );
352- tryCompleteStart (true , null );
353- }
354- break ;
355- case STATUS :
356- FDv2SourceResult .Status status = result .getStatus ();
357- if (status != null ) {
358- switch (status .getState ()) {
359- case INTERRUPTED :
360- sink .setStatus (DataSourceState .INTERRUPTED , status .getError ());
361- break ;
362- case SHUTDOWN :
363- running = false ;
364- break ;
365- case TERMINAL_ERROR :
366- sourceManager .blockCurrentSynchronizer ();
367- running = false ;
368- sink .setStatus (DataSourceState .INTERRUPTED , status .getError ());
369- break ;
370- case GOODBYE :
371- break ;
372- default :
373- break ;
351+ switch (result .getResultType ()) {
352+ case CHANGE_SET :
353+ ChangeSet <Map <String , DataModel .Flag >> changeSet = result .getChangeSet ();
354+ if (changeSet != null ) {
355+ sink .apply (context , changeSet );
356+ sink .setStatus (DataSourceState .VALID , null );
357+ tryCompleteStart (true , null );
374358 }
375- }
376- break ;
359+ break ;
360+ case STATUS :
361+ FDv2SourceResult .Status status = result .getStatus ();
362+ if (status != null ) {
363+ switch (status .getState ()) {
364+ case INTERRUPTED :
365+ sink .setStatus (DataSourceState .INTERRUPTED , status .getError ());
366+ break ;
367+ case SHUTDOWN :
368+ // This synchronizer is shutting down cleanly/intentionally
369+ running = false ;
370+ break ;
371+ case TERMINAL_ERROR :
372+ // This synchronizer cannot recover; block it so the outer
373+ // loop advances to the next available synchronizer.
374+ sourceManager .blockCurrentSynchronizer ();
375+ running = false ;
376+ sink .setStatus (DataSourceState .INTERRUPTED , status .getError ());
377+ break ;
378+ case GOODBYE :
379+ // We let the synchronizer handle this internally.
380+ break ;
381+ default :
382+ break ;
383+ }
384+ }
385+ break ;
386+ }
377387 }
378388 }
389+ } catch (ExecutionException e ) {
390+ logger .warn ("Synchronizer error: {}" , e .getCause () != null ? e .getCause ().toString () : e .toString ());
391+ sink .setStatus (DataSourceState .INTERRUPTED , e .getCause () != null ? e .getCause () : e );
392+ } catch (CancellationException e ) {
393+ logger .warn ("Synchronizer cancelled: {}" , e .toString ());
394+ sink .setStatus (DataSourceState .INTERRUPTED , e );
395+ } catch (InterruptedException e ) {
396+ logger .warn ("Synchronizer interrupted: {}" , e .toString ());
397+ sink .setStatus (DataSourceState .INTERRUPTED , e );
398+ return ;
379399 }
380- } catch (ExecutionException e ) {
381- logger .warn ("Synchronizer error: {}" , e .getCause () != null ? e .getCause ().toString () : e .toString ());
382- sink .setStatus (DataSourceState .INTERRUPTED , e .getCause () != null ? e .getCause () : e );
383- } catch (CancellationException e ) {
384- logger .warn ("Synchronizer cancelled: {}" , e .toString ());
385- sink .setStatus (DataSourceState .INTERRUPTED , e );
386- } catch (InterruptedException e ) {
387- logger .warn ("Synchronizer interrupted: {}" , e .toString ());
388- sink .setStatus (DataSourceState .INTERRUPTED , e );
389- return ;
400+ synchronizer = sourceManager .getNextAvailableSynchronizerAndSetActive ();
390401 }
391- synchronizer = sourceManager .getNextAvailableSynchronizerAndSetActive ();
402+ } finally {
403+ sourceManager .close ();
392404 }
393405 }
394406}
0 commit comments