diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index 906c484..dc9423d 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -1,6 +1,7 @@ package com.launchdarkly.sdk.server; import com.google.common.collect.ImmutableList; +import com.launchdarkly.logging.LDLogger; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Initializer; import com.launchdarkly.sdk.server.datasources.Synchronizer; @@ -8,83 +9,89 @@ import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; -import java.io.Closeable; -import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -class FDv2DataSource implements DataSource { - private final List> initializers; - private final List synchronizers; - - private final DataSourceUpdateSinkV2 dataSourceUpdates; - - private final CompletableFuture startFuture = new CompletableFuture<>(); - private final AtomicBoolean started = new AtomicBoolean(false); +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.ConditionFactory; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; +import static com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; +class FDv2DataSource implements DataSource { /** - * Lock for active sources and shutdown state. + * Default fallback timeout of 2 minutes. The timeout is only configurable for testing. */ - private final Object activeSourceLock = new Object(); - private Closeable activeSource; - private boolean isShutdown = false; - - private static class SynchronizerFactoryWithState { - public enum State { - /** - * This synchronizer is available to use. - */ - Available, - - /** - * This synchronizer is no longer available to use. - */ - Blocked - } + private static final int defaultFallbackTimeout = 2 * 60; - private final DataSourceFactory factory; + /** + * Default recovery timeout of 5 minutes. The timeout is only configurable for testing. + */ + private static final long defaultRecoveryTimeout = 5 * 60; - private State state = State.Available; + private final List> initializers; + private final SynchronizerStateManager synchronizerStateManager; + private final List conditionFactories; - public SynchronizerFactoryWithState(DataSourceFactory factory) { - this.factory = factory; - } + private final DataSourceUpdateSinkV2 dataSourceUpdates; - public State getState() { - return state; - } + private final CompletableFuture startFuture = new CompletableFuture<>(); + private final AtomicBoolean started = new AtomicBoolean(false); - public void block() { - state = State.Blocked; - } + private final int threadPriority; - public Synchronizer build() { - return factory.build(); - } - } + private final LDLogger logger; public interface DataSourceFactory { T build(); } + public FDv2DataSource( + ImmutableList> initializers, + ImmutableList> synchronizers, + DataSourceUpdateSinkV2 dataSourceUpdates, + int threadPriority, + LDLogger logger, + ScheduledExecutorService sharedExecutor + ) { + this(initializers, + synchronizers, + dataSourceUpdates, + threadPriority, + logger, + sharedExecutor, + defaultFallbackTimeout, + defaultRecoveryTimeout + ); + } + public FDv2DataSource( - ImmutableList> initializers, - ImmutableList> synchronizers, - DataSourceUpdateSinkV2 dataSourceUpdates + ImmutableList> initializers, + ImmutableList> synchronizers, + DataSourceUpdateSinkV2 dataSourceUpdates, + int threadPriority, + LDLogger logger, + ScheduledExecutorService sharedExecutor, + long fallbackTimeout, + long recoveryTimeout ) { this.initializers = initializers; - this.synchronizers = synchronizers - .stream() - .map(SynchronizerFactoryWithState::new) - .collect(Collectors.toList()); + List synchronizerFactories = synchronizers + .stream() + .map(SynchronizerFactoryWithState::new) + .collect(Collectors.toList()); + this.synchronizerStateManager = new SynchronizerStateManager(synchronizerFactories); this.dataSourceUpdates = dataSourceUpdates; + this.threadPriority = threadPriority; + this.logger = logger; + this.conditionFactories = new ArrayList<>(); + this.conditionFactories.add(new FallbackCondition.Factory(sharedExecutor, fallbackTimeout)); + this.conditionFactories.add(new RecoveryCondition.Factory(sharedExecutor, recoveryTimeout)); } private void run() { @@ -92,33 +99,27 @@ private void run() { if (!initializers.isEmpty()) { runInitializers(); } - runSynchronizers(); + boolean fdv1Fallback = runSynchronizers(); + if (fdv1Fallback) { + // TODO: Run FDv1 fallback. + } // TODO: Handle. We have ran out of sources or we are shutting down. + + // If we had initialized at some point, then the future will already be complete and this will be ignored. + startFuture.complete(false); }); runThread.setDaemon(true); - // TODO: Thread priority. - //thread.setPriority(threadPriority); + runThread.setPriority(threadPriority); runThread.start(); } - private SynchronizerFactoryWithState getFirstAvailableSynchronizer() { - synchronized (synchronizers) { - for (SynchronizerFactoryWithState synchronizer : synchronizers) { - if (synchronizer.getState() == SynchronizerFactoryWithState.State.Available) { - return synchronizer; - } - } - - return null; - } - } private void runInitializers() { boolean anyDataReceived = false; for (DataSourceFactory factory : initializers) { try { Initializer initializer = factory.build(); - if (setActiveSource(initializer)) return; + if (synchronizerStateManager.setActiveSource(initializer)) return; FDv2SourceResult result = initializer.run().get(); switch (result.getResultType()) { case CHANGE_SET: @@ -136,7 +137,9 @@ private void runInitializers() { break; } } catch (ExecutionException | InterruptedException | CancellationException e) { - // TODO: Log. + // TODO: Better messaging? + // TODO: Data source status? + logger.warn("Error running initializer: {}", e.toString()); } } // We received data without a selector, and we have exhausted initializers, so we are going to @@ -147,74 +150,122 @@ private void runInitializers() { } } - private void runSynchronizers() { - SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer(); - // TODO: Add recovery handling. If there are no available synchronizers, but there are - // recovering ones, then we likely will want to wait for them to be available (or bypass recovery). - while (availableSynchronizer != null) { - Synchronizer synchronizer = availableSynchronizer.build(); - // Returns true if shutdown. - if (setActiveSource(synchronizer)) return; - try { - boolean running = true; - while (running) { - FDv2SourceResult result = synchronizer.next().get(); - switch (result.getResultType()) { - case CHANGE_SET: - dataSourceUpdates.apply(result.getChangeSet()); - // This could have been completed by any data source. But if it has not been completed before - // now, then we complete it. - startFuture.complete(true); - break; - case STATUS: - FDv2SourceResult.Status status = result.getStatus(); - switch (status.getState()) { - case INTERRUPTED: - // TODO: Track how long we are interrupted. - break; - case SHUTDOWN: - // We should be overall shutting down. - // TODO: We may need logging or to do a little more. - return; - case TERMINAL_ERROR: - availableSynchronizer.block(); - running = false; - break; - case GOODBYE: - // We let the synchronizer handle this internally. - break; - } - break; - } - } - } catch (ExecutionException | InterruptedException | CancellationException e) { - // TODO: Log. - // Move to next synchronizer. - } - availableSynchronizer = getFirstAvailableSynchronizer(); + /** + * Determine conditions for the current synchronizer. Synchronizers require different conditions depending on if + * they are the 'prime' synchronizer or if there are other available synchronizers to use. + * + * @return a list of conditions to apply to the synchronizer + */ + private List getConditions() { + int availableSynchronizers = synchronizerStateManager.getAvailableSynchronizerCount(); + boolean isPrimeSynchronizer = synchronizerStateManager.isPrimeSynchronizer(); + + if (availableSynchronizers == 1) { + // If there is only 1 synchronizer, then we cannot fall back or recover, so we don't need any conditions. + return Collections.emptyList(); + } + if (isPrimeSynchronizer) { + // If there isn't a synchronizer to recover to, then don't add and recovery conditions. + return conditionFactories.stream() + .filter((ConditionFactory factory) -> factory.getType() != Condition.ConditionType.RECOVERY) + .map(ConditionFactory::build).collect(Collectors.toList()); } + // The synchronizer can both fall back and recover. + return conditionFactories.stream().map(ConditionFactory::build).collect(Collectors.toList()); } - private void safeClose(Closeable synchronizer) { + private boolean runSynchronizers() { + // When runSynchronizers exists, no matter how it exits, the synchronizerStateManager will be closed. try { - synchronizer.close(); - } catch (IOException e) { - // Ignore close exceptions. - } - } + SynchronizerFactoryWithState availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); - private boolean setActiveSource(Closeable synchronizer) { - synchronized (activeSourceLock) { - if (activeSource != null) { - safeClose(activeSource); - } - if (isShutdown) { - safeClose(synchronizer); - return true; + // We want to continue running synchronizers for as long as any are available. + while (availableSynchronizer != null) { + Synchronizer synchronizer = availableSynchronizer.build(); + + // Returns true if shutdown. + if (synchronizerStateManager.setActiveSource(synchronizer)) return false; + + try { + boolean running = true; + + try (Conditions conditions = new Conditions(getConditions())) { + while (running) { + CompletableFuture nextResultFuture = synchronizer.next(); + + // The conditionsFuture will complete if any condition is met. Meeting any condition means we will + // switch to a different synchronizer. + Object res = CompletableFuture.anyOf(conditions.getFuture(), nextResultFuture).get(); + + if (res instanceof Condition) { + Condition c = (Condition) res; + switch (c.getType()) { + case FALLBACK: + // For fallback, we will move to the next available synchronizer, which may loop. + // This is the default behavior of exiting the run loop, so we don't need to take + // any action. + break; + case RECOVERY: + // For recovery, we will start at the first available synchronizer. + // So we reset the source index, and finding the source will start at the beginning. + synchronizerStateManager.resetSourceIndex(); + break; + } + // A running synchronizer will only have fallback and recovery conditions that it can act on. + // So, if there are no synchronizers to recover to or fallback to, then we will not have + // those conditions. + break; + } + + + FDv2SourceResult result = (FDv2SourceResult) res; + conditions.inform(result); + + switch (result.getResultType()) { + case CHANGE_SET: + dataSourceUpdates.apply(result.getChangeSet()); + // This could have been completed by any data source. But if it has not been completed before + // now, then we complete it. + startFuture.complete(true); + break; + case STATUS: + FDv2SourceResult.Status status = result.getStatus(); + switch (status.getState()) { + case INTERRUPTED: + // Handled by conditions. + // TODO: Data source status. + break; + case SHUTDOWN: + // We should be overall shutting down. + // TODO: We may need logging or to do a little more. + return false; + case TERMINAL_ERROR: + availableSynchronizer.block(); + running = false; + break; + case GOODBYE: + // We let the synchronizer handle this internally. + break; + } + break; + } + // We have been requested to fall back to FDv1. We handle whatever message was associated, + // close the synchronizer, and then fallback. + if (result.isFdv1Fallback()) { + return true; + } + } + } + } catch (ExecutionException | InterruptedException | CancellationException e) { + // TODO: Log. + // Move to next synchronizer. + } + availableSynchronizer = synchronizerStateManager.getNextAvailableSynchronizer(); } - activeSource = synchronizer; + return false; + } finally { + synchronizerStateManager.close(); } - return false; } @Override @@ -235,19 +286,43 @@ public boolean isInitialized() { } @Override - public void close() throws IOException { + public void close() { // If there is an active source, we will shut it down, and that will result in the loop handling that source // exiting. // If we do not have an active source, then the loop will check isShutdown when attempting to set one. When - // it detects shutdown it will exit the loop. - synchronized (activeSourceLock) { - isShutdown = true; - if (activeSource != null) { - activeSource.close(); - } - } + // it detects shutdown, it will exit the loop. + synchronizerStateManager.close(); // If this is already set, then this has no impact. startFuture.complete(false); } + + /** + * Helper class to manage the lifecycle of conditions with automatic cleanup. + */ + private static class Conditions implements AutoCloseable { + private final List conditions; + private final CompletableFuture conditionsFuture; + + public Conditions(List conditions) { + this.conditions = conditions; + this.conditionsFuture = conditions.isEmpty() + ? new CompletableFuture<>() // Never completes if no conditions + : CompletableFuture.anyOf( + conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); + } + + public CompletableFuture getFuture() { + return conditionsFuture; + } + + public void inform(FDv2SourceResult result) { + conditions.forEach(c -> c.inform(result)); + } + + @Override + public void close() { + conditions.forEach(Condition::close); + } + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java new file mode 100644 index 0000000..b5ed8b7 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSourceConditions.java @@ -0,0 +1,184 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Container class for FDv2 data source conditions and related types. + *

+ * This class is non-constructable and serves only as a namespace for condition-related types. + * Package-private for internal use and testing. + */ +class FDv2DataSourceConditions { + /** + * Private constructor to prevent instantiation. + */ + private FDv2DataSourceConditions() { + } + + /** + * Package-private for testing. + */ + interface Condition extends Closeable { + enum ConditionType { + FALLBACK, + RECOVERY, + } + + CompletableFuture execute(); + + void inform(FDv2SourceResult sourceResult); + + void close(); + + ConditionType getType(); + } + + interface ConditionFactory { + Condition build(); + + Condition.ConditionType getType(); + } + + static abstract class TimedCondition implements Condition { + protected final CompletableFuture resultFuture = new CompletableFuture<>(); + + protected final ScheduledExecutorService sharedExecutor; + + /** + * Future for the timeout task, if any. Will be null when no timeout is active. + */ + protected ScheduledFuture timerFuture; + + /** + * Timeout duration for the fallback operation. + */ + protected final long timeoutSeconds; + + public TimedCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeoutSeconds; + } + + @Override + public CompletableFuture execute() { + return resultFuture; + } + + @Override + public void close() { + if (timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + + static abstract class Factory implements ConditionFactory { + protected final ScheduledExecutorService sharedExecutor; + protected final long timeoutSeconds; + + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + this.sharedExecutor = sharedExecutor; + this.timeoutSeconds = timeout; + } + } + } + + /** + * This condition is used to determine if a fallback should be performed. It is informed of each data source result + * via {@link #inform(FDv2SourceResult)}. Based on the results, it updates its internal state. When the fallback + * condition is met, then the {@link java.util.concurrent.Future} returned by {@link #execute()} will complete. + *

+ * This is package-private, instead of private, for ease of testing. + */ + static class FallbackCondition extends TimedCondition { + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + + @Override + public Condition build() { + return new FallbackCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + public FallbackCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + if (sourceResult == null) { + return; + } + if (sourceResult.getResultType() == FDv2SourceResult.ResultType.CHANGE_SET) { + if (timerFuture != null) { + timerFuture.cancel(false); + timerFuture = null; + } + } + if (sourceResult.getResultType() == FDv2SourceResult.ResultType.STATUS && sourceResult.getStatus().getState() == FDv2SourceResult.State.INTERRUPTED) { + if (timerFuture == null) { + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + } + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } + + static class RecoveryCondition extends TimedCondition { + + static class Factory extends TimedCondition.Factory { + public Factory(ScheduledExecutorService sharedExecutor, long timeout) { + super(sharedExecutor, timeout); + } + + @Override + public Condition build() { + return new RecoveryCondition(sharedExecutor, timeoutSeconds); + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } + + public RecoveryCondition(ScheduledExecutorService sharedExecutor, long timeoutSeconds) { + super(sharedExecutor, timeoutSeconds); + timerFuture = sharedExecutor.schedule(() -> { + resultFuture.complete(this); + return null; + }, timeoutSeconds, TimeUnit.SECONDS); + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + // Time-based recovery. + } + + @Override + public ConditionType getType() { + return ConditionType.RECOVERY; + } + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java index 0562ce6..77aeb4e 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java @@ -157,7 +157,10 @@ static FDv2DataSystem create( DataSource dataSource = new FDv2DataSource( initializerFactories, synchronizerFactories, - dataSourceUpdates + dataSourceUpdates, + config.threadPriority, + clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME), + clientContext.sharedExecutor ); DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl( dataSourceStatusBroadcaster, diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java index 823aa6a..588ba3f 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Loggers.java @@ -24,4 +24,7 @@ private Loggers() {} static final String EVALUATION_LOGGER_NAME = "Evaluation"; static final String EVENTS_LOGGER_NAME = "Events"; static final String HOOKS_LOGGER_NAME = "Hooks"; + static final String STREAMING_SYNCHRONIZER = "StreamingSynchronizer"; + static final String POLLING_SYNCHRONIZER = "PollingSynchronizer"; + static final String POLLING_INITIALIZER = "PollingInitializer"; } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java index 856a118..2e3b368 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingInitializerImpl.java @@ -12,7 +12,7 @@ class PollingInitializerImpl extends PollingBase implements Initializer { private final SelectorSource selectorSource; public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) { - super(requestor, logger); + super(requestor, logger.subLogger(Loggers.POLLING_INITIALIZER)); this.selectorSource = selectorSource; } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java index 8bbc6a4..43c95ee 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PollingSynchronizerImpl.java @@ -23,7 +23,7 @@ public PollingSynchronizerImpl( ScheduledExecutorService sharedExecutor, Duration pollInterval ) { - super(requestor, logger); + super(requestor, logger.subLogger(Loggers.POLLING_SYNCHRONIZER)); this.selectorSource = selectorSource; synchronized (this) { diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 504b6e6..0145396 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -71,7 +71,7 @@ public StreamingSynchronizerImpl( ) { this.httpProperties = httpProperties; this.selectorSource = selectorSource; - this.logger = logger; + this.logger = logger.subLogger(Loggers.STREAMING_SYNCHRONIZER); this.payloadFilter = payloadFilter; this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); this.initialReconnectDelay = initialReconnectDelaySeconds; diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java new file mode 100644 index 0000000..c0afa64 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerFactoryWithState.java @@ -0,0 +1,38 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.Synchronizer; + +class SynchronizerFactoryWithState { + public enum State { + /** + * This synchronizer is available to use. + */ + Available, + + /** + * This synchronizer is no longer available to use. + */ + Blocked + } + + private final FDv2DataSource.DataSourceFactory factory; + + private State state = State.Available; + + + public SynchronizerFactoryWithState(FDv2DataSource.DataSourceFactory factory) { + this.factory = factory; + } + + public State getState() { + return state; + } + + public void block() { + state = State.Blocked; + } + + public Synchronizer build() { + return factory.build(); + } +} diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java new file mode 100644 index 0000000..fe8db81 --- /dev/null +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SynchronizerStateManager.java @@ -0,0 +1,165 @@ +package com.launchdarkly.sdk.server; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Manages the state of synchronizers including tracking which synchronizer is active, + * managing the list of available synchronizers, and handling source transitions. + *

+ * Package-private for internal use. + */ +class SynchronizerStateManager implements Closeable { + private final List synchronizers; + + /** + * Lock for active sources and shutdown state. + */ + private final Object activeSourceLock = new Object(); + private Closeable activeSource; + private boolean isShutdown = false; + + /** + * We start at -1, so finding the next synchronizer can non-conditionally increment the index. + */ + private int sourceIndex = -1; + + public SynchronizerStateManager(List synchronizers) { + this.synchronizers = synchronizers; + } + + /** + * Reset the source index to -1, indicating that we should start from the first synchronizer when looking for + * the next one to use. This is used when recovering from a non-primary synchronizer. + */ + public void resetSourceIndex() { + synchronized (activeSourceLock) { + sourceIndex = -1; + } + } + + /** + * Get the next synchronizer to use. This operates based on tracking the index of the currently active synchronizer, + * which will loop through all available synchronizers handling interruptions. Then a non-prime synchronizer recovers + * the source index will be reset, and we start at the beginning. + *

+ * Any given synchronizer can be marked as blocked, in which case that synchronizer is not eligible to be used again. + * Synchronizers that are not blocked are available, and this function will only return available synchronizers. + * @return the next synchronizer factory to use, or null if there are no more available synchronizers. + */ + public SynchronizerFactoryWithState getNextAvailableSynchronizer() { + synchronized (synchronizers) { + SynchronizerFactoryWithState factory = null; + + // There is at least one available factory. + if(synchronizers.stream().anyMatch(s -> s.getState() == SynchronizerFactoryWithState.State.Available)) { + // Look for the next synchronizer starting at the position after the current one. (avoiding just re-using the same synchronizer.) + while(factory == null) { + sourceIndex++; + // We aren't using module here because we want to keep the stored index within range instead + // of increasing indefinitely. + if(sourceIndex >= synchronizers.size()) { + sourceIndex = 0; + } + SynchronizerFactoryWithState candidate = synchronizers.get(sourceIndex); + if (candidate.getState() == SynchronizerFactoryWithState.State.Available) { + factory = candidate; + } + + } + } + + return factory; + } + } + + /** + * Determine if the currently active synchronizer is the prime (first available) synchronizer. + * @return true if the current synchronizer is the prime synchronizer, false otherwise + */ + public boolean isPrimeSynchronizer() { + synchronized (activeSourceLock) { + boolean firstAvailableSynchronizer = true; + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + if (firstAvailableSynchronizer && sourceIndex == index) { + // This is the first synchronizer that is available, and it also is the current one. + return true; + } + // Subsequently encountered synchronizers that are available are not the first one. + firstAvailableSynchronizer = false; + } + } + } + return false; + } + + /** + * Get the count of available synchronizers. + * @return the number of available synchronizers + */ + public int getAvailableSynchronizerCount() { + synchronized (activeSourceLock) { + int count = 0; + for (int index = 0; index < synchronizers.size(); index++) { + if (synchronizers.get(index).getState() == SynchronizerFactoryWithState.State.Available) { + count++; + } + } + return count; + } + } + + /** + * Set the active source. If shutdown has been initiated, the source will be closed immediately. + * Any previously active source will be closed. + * @param source the source to set as active + * @return true if shutdown has been initiated, false otherwise + */ + public boolean setActiveSource(Closeable source) { + synchronized (activeSourceLock) { + if (activeSource != null) { + safeClose(activeSource); + } + if (isShutdown) { + safeClose(source); + return true; + } + activeSource = source; + } + return false; + } + + /** + * Close the state manager and shut down any active source. + * Implements AutoCloseable to enable try-with-resources usage. + */ + @Override + public void close() { + synchronized (activeSourceLock) { + isShutdown = true; + if (activeSource != null) { + try { + activeSource.close(); + } catch (IOException e) { + // We are done with this synchronizer, so we don't care if it encounters + // an error condition. + } + activeSource = null; + } + } + } + + /** + * Safely close a closeable, ignoring any exceptions. + * @param closeable the closeable to close + */ + private void safeClose(Closeable closeable) { + try { + closeable.close(); + } catch (IOException e) { + // Ignore close exceptions. + } + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java new file mode 100644 index 0000000..ca3fb5f --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceFallbackConditionTest.java @@ -0,0 +1,438 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; + +import org.junit.After; +import org.junit.Test; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class FDv2DataSourceFallbackConditionTest extends BaseTest { + + private ScheduledExecutorService executor; + + @After + public void tearDown() { + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + } + + private DataStoreTypes.ChangeSet makeChangeSet() { + return new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.None, + Selector.EMPTY, + null, + null + ); + } + + @Test + public void executeReturnsCompletableFuture() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + CompletableFuture result = condition.execute(); + + assertFalse(result.isDone()); + } + + @Test + public void getTypeReturnsFallback() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + assertEquals(Condition.ConditionType.FALLBACK, condition.getType()); + } + + @Test + public void interruptedStatusStartsTimerThatCompletesResultFuture() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + assertFalse(resultFuture.isDone()); + + // Inform with INTERRUPTED status + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Future should still not be done immediately + assertFalse(resultFuture.isDone()); + + // Wait for timeout to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + // Now it should be done and return the condition instance + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void changeSetCancelsActiveTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer with INTERRUPTED + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel timer with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait longer than the timeout period + Thread.sleep(1500); + + // Future should still not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } + + @Test + public void changeSetWithoutActiveTimerDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with CHANGE_SET without starting a timer first + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait to ensure nothing happens + Thread.sleep(100); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void multipleInterruptedStatusesDoNotStartMultipleTimers() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 2); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with INTERRUPTED multiple times + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Thread.sleep(100); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Thread.sleep(100); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for the timer (should only fire once) + Condition result = resultFuture.get(3, TimeUnit.SECONDS); + + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void terminalErrorStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with TERMINAL_ERROR status + condition.inform( + FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, 401, null, Instant.now()), + false + ) + ); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void shutdownStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with SHUTDOWN status + condition.inform(FDv2SourceResult.shutdown()); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void goodbyeStatusDoesNotStartTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with GOODBYE status + condition.inform(FDv2SourceResult.goodbye("server-requested", false)); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete (no timer started) + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeCancelsActiveTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer with INTERRUPTED + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Close the condition + condition.close(); + + // Wait longer than the timeout period + Thread.sleep(1500); + + // Future should still not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeWithoutActiveTimerDoesNotFail() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + // Close without starting a timer + condition.close(); + + // Should not throw exception + } + + @Test + public void timerCanBeStartedAfterBeingCancelled() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel timer with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Start timer again + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for second timer to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void changeSetAfterTimerFiresDoesNotAffectCompletedFuture() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait for timer to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + + // Inform with CHANGE_SET after timer has fired + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Future should remain complete + assertTrue(resultFuture.isDone()); + } + + @Test + public void factoryCreatesFallbackCondition() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition.Factory factory = new FallbackCondition.Factory(executor, 1); + + FallbackCondition condition = (FallbackCondition) factory.build(); + + // Verify it works by using it + CompletableFuture resultFuture = condition.execute(); + assertFalse(resultFuture.isDone()); + + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void executeReturnsTheSameFutureOnMultipleCalls() { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 120); + + CompletableFuture first = condition.execute(); + CompletableFuture second = condition.execute(); + + assertSame(first, second); + } + + @Test + public void changeSetDuringTimerExecutionCancelsTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Wait partway through timeout period + Thread.sleep(500); + + // Cancel with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait past the original timeout + Thread.sleep(1000); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void multipleChangeSetCallsWithActiveTimerAreHandled() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Cancel with multiple CHANGE_SETs + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Wait longer than timeout + Thread.sleep(1500); + + // Future should still not be complete + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeCanBeCalledMultipleTimes() throws Exception { + executor = Executors.newScheduledThreadPool(1); + FallbackCondition condition = new FallbackCondition(executor, 1); + + // Start timer + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Close multiple times + condition.close(); + condition.close(); + condition.close(); + + // Should not throw exception + } +} \ No newline at end of file diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceRecoveryConditionTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceRecoveryConditionTest.java new file mode 100644 index 0000000..2c584f0 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceRecoveryConditionTest.java @@ -0,0 +1,322 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; + +import org.junit.After; +import org.junit.Test; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings("javadoc") +public class FDv2DataSourceRecoveryConditionTest extends BaseTest { + + private ScheduledExecutorService executor; + + @After + public void tearDown() { + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + } + + private DataStoreTypes.ChangeSet makeChangeSet() { + return new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.None, + Selector.EMPTY, + null, + null + ); + } + + @Test + public void getTypeReturnsRecovery() { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 120); + + assertEquals(Condition.ConditionType.RECOVERY, condition.getType()); + } + + @Test + public void timerStartsImmediatelyAndCompletesResultFuture() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Future should not be done immediately + assertFalse(resultFuture.isDone()); + + // Wait for timeout to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + // Now it should be done and return the condition instance + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void closeCancelsActiveTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Close the condition before timeout + condition.close(); + + // Wait longer than the timeout period + Thread.sleep(1500); + + // Future should still not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } + + @Test + public void closeAfterTimerFiresDoesNotCauseError() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Wait for timer to fire + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + + // Close after timer has fired + condition.close(); + + // Should not throw exception + } + + @Test + public void closeCanBeCalledMultipleTimes() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + // Close multiple times before timer fires + condition.close(); + condition.close(); + condition.close(); + + // Should not throw exception + } + + @Test + public void informWithChangeSetDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with CHANGE_SET + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void informWithInterruptedStatusDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with INTERRUPTED status + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void informWithTerminalErrorStatusDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with TERMINAL_ERROR status + condition.inform( + FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, 401, null, Instant.now()), + false + ) + ); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void informWithShutdownStatusDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with SHUTDOWN status + condition.inform(FDv2SourceResult.shutdown()); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void informWithGoodbyeStatusDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with GOODBYE status + condition.inform(FDv2SourceResult.goodbye("server-requested", false)); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void informWithNullDoesNothing() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Inform with null + condition.inform(null); + + // Timer should still fire after timeout (inform does nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void multipleInformCallsDoNotAffectTimer() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + CompletableFuture resultFuture = condition.execute(); + + // Multiple inform calls + DataStoreTypes.ChangeSet changeSet = makeChangeSet(); + condition.inform(FDv2SourceResult.changeSet(changeSet, false)); + condition.inform( + FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo(DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, 500, null, Instant.now()), + false + ) + ); + condition.inform(FDv2SourceResult.shutdown()); + + // Timer should still fire after timeout (all inform calls do nothing) + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void factoryCreatesRecoveryCondition() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition.Factory factory = new RecoveryCondition.Factory(executor, 1); + + RecoveryCondition condition = (RecoveryCondition) factory.build(); + + // Verify it works by using it + CompletableFuture resultFuture = condition.execute(); + assertFalse(resultFuture.isDone()); + + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + assertTrue(resultFuture.isDone()); + assertSame(condition, result); + } + + @Test + public void factoryGetTypeReturnsRecovery() { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition.Factory factory = new RecoveryCondition.Factory(executor, 1); + + assertEquals(Condition.ConditionType.RECOVERY, factory.getType()); + } + + @Test + public void executeReturnsTheSameFutureOnMultipleCalls() { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 120); + + CompletableFuture first = condition.execute(); + CompletableFuture second = condition.execute(); + + assertSame(first, second); + } + + @Test + public void timerStartsImmediatelyOnConstruction() throws Exception { + executor = Executors.newScheduledThreadPool(1); + + // Create condition with very short timeout + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + // Get the future + CompletableFuture resultFuture = condition.execute(); + + // Verify it's not done yet + assertFalse(resultFuture.isDone()); + + // Wait for it to complete + Condition result = resultFuture.get(2, TimeUnit.SECONDS); + + // Verify it completed with the condition + assertSame(condition, result); + } + + @Test + public void closeBeforeExecuteDoesNotPreventFutureAccess() throws Exception { + executor = Executors.newScheduledThreadPool(1); + RecoveryCondition condition = new RecoveryCondition(executor, 1); + + // Close immediately + condition.close(); + + // Should still be able to get the future + CompletableFuture resultFuture = condition.execute(); + + // Wait to ensure timer doesn't fire + Thread.sleep(1500); + + // Future should not be complete (timer was cancelled) + assertFalse(resultFuture.isDone()); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java new file mode 100644 index 0000000..dc354ac --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java @@ -0,0 +1,2175 @@ +package com.launchdarkly.sdk.server; + +import com.google.common.collect.ImmutableList; +import com.launchdarkly.logging.LDLogger; +import com.launchdarkly.logging.Logs; +import com.launchdarkly.sdk.internal.fdv2.sources.Selector; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.datasources.Initializer; +import com.launchdarkly.sdk.server.datasources.Synchronizer; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; +import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; + +import org.junit.After; +import org.junit.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +@SuppressWarnings("javadoc") +public class FDv2DataSourceTest extends BaseTest { + + private ScheduledExecutorService executor; + private final LDLogger logger = LDLogger.withAdapter(Logs.none(), ""); + private final List resourcesToClose = new ArrayList<>(); + + @After + public void tearDown() { + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + for (AutoCloseable resource : resourcesToClose) { + try { + resource.close(); + } catch (Exception e) { + // Ignore cleanup exceptions + } + } + resourcesToClose.clear(); + } + + private DataStoreTypes.ChangeSet makeChangeSet(boolean withSelector) { + Selector selector = withSelector ? Selector.make(1, "test-state") : Selector.EMPTY; + return new DataStoreTypes.ChangeSet<>( + DataStoreTypes.ChangeSetType.None, + selector, + null, + null + ); + } + + private FDv2SourceResult makeInterruptedResult() { + return FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 500, + null, + Instant.now() + ), + false + ); + } + + private FDv2SourceResult makeTerminalErrorResult() { + return FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, + 401, + null, + Instant.now() + ), + false + ); + } + + // ============================================================================ + // Initializer Scenarios + // ============================================================================ + + @Test + public void firstInitializerFailsSecondInitializerSucceedsWithSelector() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture firstInitializerFuture = new CompletableFuture<>(); + firstInitializerFuture.completeExceptionally(new RuntimeException("First initializer failed")); + + CompletableFuture secondInitializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(firstInitializerFuture), + () -> new MockInitializer(secondInitializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + // TODO: Verify status updated to VALID when data source status is implemented + } + + @Test + public void firstInitializerFailsSecondInitializerSucceedsWithoutSelector() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture firstInitializerFuture = new CompletableFuture<>(); + firstInitializerFuture.completeExceptionally(new RuntimeException("First initializer failed")); + + CompletableFuture secondInitializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + BlockingQueue synchronizerCalledQueue = new LinkedBlockingQueue<>(); + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(firstInitializerFuture), + () -> new MockInitializer(secondInitializerFuture) + ); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + synchronizerCalledQueue.offer(true); + return new MockSynchronizer(synchronizerFuture); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Wait for synchronizer to be called + Boolean synchronizerCalled = synchronizerCalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("Synchronizer should be called", synchronizerCalled); + + // Wait for apply to be processed + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertEquals(2, sink.getApplyCount()); // One from initializer, one from synchronizer + // TODO: Verify status updated to VALID when data source status is implemented + } + + @Test + public void firstInitializerSucceedsWithSelectorSecondInitializerNotInvoked() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture firstInitializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + AtomicBoolean secondInitializerCalled = new AtomicBoolean(false); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(firstInitializerFuture), + () -> { + secondInitializerCalled.set(true); + return new MockInitializer(CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + )); + } + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertFalse(secondInitializerCalled.get()); + assertEquals(1, sink.getApplyCount()); + // TODO: Verify status updated to VALID when data source status is implemented + } + + @Test + public void allInitializersFailSwitchesToSynchronizers() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture firstInitializerFuture = new CompletableFuture<>(); + firstInitializerFuture.completeExceptionally(new RuntimeException("First failed")); + + CompletableFuture secondInitializerFuture = new CompletableFuture<>(); + secondInitializerFuture.completeExceptionally(new RuntimeException("Second failed")); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(firstInitializerFuture), + () -> new MockInitializer(secondInitializerFuture) + ); + + BlockingQueue synchronizerCalledQueue = new LinkedBlockingQueue<>(); + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + synchronizerCalledQueue.offer(true); + return new MockSynchronizer(synchronizerFuture); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Wait for synchronizer to be called + Boolean synchronizerCalled = synchronizerCalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("Synchronizer should be called", synchronizerCalled); + + // Wait for apply to be processed + sink.awaitApplyCount(1, 2, TimeUnit.SECONDS); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void allThreeInitializersFailWithNoSynchronizers() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of( + () -> { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Failed")); + return new MockInitializer(future); + }, + () -> { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Failed")); + return new MockInitializer(future); + }, + () -> { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new RuntimeException("Failed")); + return new MockInitializer(future); + } + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertFalse(dataSource.isInitialized()); + assertEquals(0, sink.getApplyCount()); + // TODO: Verify status reflects exhausted sources when data source status is implemented + } + + @Test + public void oneInitializerNoSynchronizerIsWellBehaved() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + // TODO: Verify status updated to VALID when data source status is implemented + } + + // ============================================================================ + // Synchronizer Scenarios + // ============================================================================ + + @Test + public void noInitializersOneSynchronizerIsWellBehaved() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(synchronizerFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Wait for apply to be processed + sink.awaitApplyCount(1, 2, TimeUnit.SECONDS); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void oneInitializerOneSynchronizerIsWellBehaved() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(synchronizerFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Wait for both applies to be processed + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertEquals(2, sink.getApplyCount()); + } + + @Test + public void noInitializersAndNoSynchronizersIsWellBehaved() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertFalse(dataSource.isInitialized()); + assertEquals(0, sink.getApplyCount()); + // TODO: Verify status reflects exhausted sources when data source status is implemented + } + + // ============================================================================ + // Fallback and Recovery + // ============================================================================ + + @Test + public void errorWithFDv1FallbackTriggersFallback() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), true) // FDv1 fallback flag + ); + + AtomicBoolean synchronizerCalled = new AtomicBoolean(false); + ImmutableList> synchronizers = ImmutableList.of( + () -> { + synchronizerCalled.set(true); + return new MockSynchronizer(synchronizerFuture); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(synchronizerCalled.get()); + assertEquals(1, sink.getApplyCount()); + // TODO: Verify FDv1 fallback behavior when implemented + } + + @Test + public void fallbackAndRecoveryTasksWellBehaved() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends INTERRUPTED, triggering fallback after timeout + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeInterruptedResult()); + // Keep it alive so fallback timeout triggers + + // The second synchronizer works fine, sends data + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + // Keep alive for recovery + + AtomicInteger firstSyncCallCount = new AtomicInteger(0); + AtomicInteger secondSyncCallCount = new AtomicInteger(0); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + firstSyncCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(firstSyncResults); + }, + () -> { + secondSyncCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(secondSyncResults); + } + ); + + // Use short timeouts for testing + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Expected sequence: + // 1. First sync sends apply (1) + // 2. First sync sends INTERRUPTED, fallback timer starts (1 second) + // 3. After fallback, second sync sends apply (2) + // 4. Recovery timer starts (2 seconds) + // 5. After recovery, first sync sends apply again (3) + // Total time: ~3-4 seconds (1s fallback + 2s recovery + processing) + + // Wait for 3 applies with enough time for fallback (1s) + recovery (2s) + overhead + sink.awaitApplyCount(3, 5, TimeUnit.SECONDS); + + // Both synchronizers should have been called due to fallback and recovery + assertTrue(firstSyncCallCount.get() >= 2); // Called initially and after recovery + assertTrue(secondSyncCallCount.get() >= 1); // Called after fallback + // TODO: Verify status transitions when data source status is implemented + } + + @Test + public void canDisposeWhenSynchronizersFallingBack() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer that sends INTERRUPTED to trigger fallback + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(makeInterruptedResult()); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Close while the fallback condition is active + dataSource.close(); + + // Test passes if we reach here without hanging + } + + // ============================================================================ + // Source Blocking + // ============================================================================ + + @Test + public void terminalErrorBlocksSynchronizer() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer sends terminal error + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeTerminalErrorResult()); + + // The second synchronizer works fine + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue synchronizerCallQueue = new LinkedBlockingQueue<>(); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + synchronizerCallQueue.offer(1); + return new MockQueuedSynchronizer(firstSyncResults); + }, + () -> { + synchronizerCallQueue.offer(2); + return new MockQueuedSynchronizer(secondSyncResults); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + + // Wait for both synchronizers to be called + Integer firstCall = synchronizerCallQueue.poll(2, TimeUnit.SECONDS); + Integer secondCall = synchronizerCallQueue.poll(2, TimeUnit.SECONDS); + + assertNotNull("First synchronizer should be called", firstCall); + assertNotNull("Second synchronizer should be called after first is blocked", secondCall); + assertEquals(Integer.valueOf(1), firstCall); + assertEquals(Integer.valueOf(2), secondCall); + + // Wait for applies from both + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + // TODO: Verify status transitions when data source status is implemented + } + + @Test + public void allThreeSynchronizersFailReportsExhaustion() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // All synchronizers send terminal errors + ImmutableList> synchronizers = ImmutableList.of( + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(makeTerminalErrorResult()); + return new MockQueuedSynchronizer(results); + }, + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(makeTerminalErrorResult()); + return new MockQueuedSynchronizer(results); + }, + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(makeTerminalErrorResult()); + return new MockQueuedSynchronizer(results); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertFalse(dataSource.isInitialized()); + // TODO: Verify status reflects exhausted sources when data source status is implemented + } + + // ============================================================================ + // Disabled Source Prevention + // ============================================================================ + + @Test + public void disabledDataSourceCannotTriggerActions() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer that we'll close and try to trigger + AtomicReference firstSyncRef = new AtomicReference<>(); + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeTerminalErrorResult()); + + // Second synchronizer + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + MockQueuedSynchronizer sync = new MockQueuedSynchronizer(firstSyncResults); + firstSyncRef.set(sync); + return sync; + }, + () -> new MockQueuedSynchronizer(secondSyncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for synchronizers to run and switch + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + int applyCountAfterSwitch = sink.getApplyCount(); + + // Try to send more data from the first (now closed) synchronizer + MockQueuedSynchronizer firstSync = firstSyncRef.get(); + if (firstSync != null) { + firstSync.addResult(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + } + + // Wait to ensure closed synchronizer's results aren't processed + try { + sink.awaitApplyCount(applyCountAfterSwitch + 1, 500, TimeUnit.MILLISECONDS); + } catch (Exception e) { + // Timeout expected + } + + // Apply count should not have increased from the closed synchronizer + assertEquals(applyCountAfterSwitch, sink.getApplyCount()); + } + + // ============================================================================ + // Disposal and Cleanup + // ============================================================================ + + @Test + public void disposeCompletesStartFuture() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // Synchronizer that never completes + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + + assertTrue(startFuture.isDone()); + // TODO: Verify status updated to OFF when data source status is implemented + } + + @Test + public void noSourcesProvidedCompletesImmediately() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertFalse(dataSource.isInitialized()); + // TODO: Verify status reflects exhausted sources when data source status is implemented + } + + // ============================================================================ + // Thread Safety and Concurrency + // ============================================================================ + + @Test + public void startFutureCompletesExactlyOnce() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(synchronizerFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + // Multiple completions would throw, so if we get here, it's working correctly + } + + @Test + public void concurrentCloseAndStartHandledSafely() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + + // Close immediately after starting + dataSource.close(); + + // Should not throw or hang + startFuture.get(2, TimeUnit.SECONDS); + } + + @Test + public void multipleStartCallsEventuallyComplete() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture1 = dataSource.start(); + Future startFuture2 = dataSource.start(); + Future startFuture3 = dataSource.start(); + + // All calls should complete successfully (even if they return different Future wrappers) + startFuture1.get(2, TimeUnit.SECONDS); + startFuture2.get(2, TimeUnit.SECONDS); + startFuture3.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + } + + @Test + public void isInitializedThreadSafe() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + dataSource.start(); + + // Call isInitialized from multiple threads + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + new Thread(() -> { + try { + dataSource.isInitialized(); + } finally { + latch.countDown(); + } + }).start(); + } + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + public void dataSourceUpdatesApplyThreadSafe() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + for (int i = 0; i < 10; i++) { + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + } + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for all applies to process + sink.awaitApplyCount(10, 2, TimeUnit.SECONDS); + + // Should have received multiple applies without error + assertTrue(sink.getApplyCount() >= 10); + } + + // ============================================================================ + // Exception Handling + // ============================================================================ + + @Test + public void initializerThrowsExecutionException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture badFuture = new CompletableFuture<>(); + badFuture.completeExceptionally(new RuntimeException("Execution exception")); + + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(badFuture), + () -> new MockInitializer(goodFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void initializerThrowsInterruptedException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + AtomicBoolean firstCalled = new AtomicBoolean(false); + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> { + firstCalled.set(true); + return new MockInitializer(() -> { + throw new InterruptedException("Interrupted"); + }); + }, + () -> new MockInitializer(goodFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(firstCalled.get()); + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void initializerThrowsCancellationException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture cancelledFuture = new CompletableFuture<>(); + cancelledFuture.cancel(true); + + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(cancelledFuture), + () -> new MockInitializer(goodFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void synchronizerNextThrowsExecutionException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture badFuture = new CompletableFuture<>(); + badFuture.completeExceptionally(new RuntimeException("Execution exception")); + + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(badFuture), + () -> new MockSynchronizer(goodFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void synchronizerNextThrowsInterruptedException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + AtomicBoolean firstCalled = new AtomicBoolean(false); + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + firstCalled.set(true); + return new MockSynchronizer(() -> { + throw new InterruptedException("Interrupted"); + }); + }, + () -> new MockSynchronizer(goodFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(firstCalled.get()); + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + @Test + public void synchronizerNextThrowsCancellationException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture cancelledFuture = new CompletableFuture<>(); + cancelledFuture.cancel(true); + + CompletableFuture goodFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(cancelledFuture), + () -> new MockSynchronizer(goodFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + } + + // ============================================================================ + // Resource Management + // ============================================================================ + + @Test + public void closeWithoutStartDoesNotHang() { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + dataSource.close(); + + // Test passes if we reach here without hanging + } + + @Test + public void closeAfterInitializersCompletesImmediately() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + + } + + @Test + public void closeWhileSynchronizerRunningShutdownsSource() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + AtomicBoolean synchronizerClosed = new AtomicBoolean(false); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) { + @Override + public void close() { + synchronizerClosed.set(true); + super.close(); + } + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + + assertTrue(synchronizerClosed.get()); + } + + @Test + public void multipleCloseCallsAreIdempotent() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + dataSource.close(); + dataSource.close(); + + // Test passes if we reach here without throwing + } + + @Test + public void closeInterruptsConditionWaiting() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(makeInterruptedResult()); + // Don't add more, so it waits on condition + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Close while condition is waiting + dataSource.close(); + + // Test passes if we reach here without hanging + } + + // ============================================================================ + // Active Source Management + // ============================================================================ + + @Test + public void setActiveSourceReturnsShutdownStatus() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + AtomicBoolean shutdownDetected = new AtomicBoolean(false); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) { + @Override + public CompletableFuture run() { + // This won't be called because close() is called first + return super.run(); + } + } + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + dataSource.close(); + Future startFuture = dataSource.start(); + + // Should complete without hanging since shutdown was already called + startFuture.get(2, TimeUnit.SECONDS); + // Test passes if we reach here - shutdown was handled + } + + @Test + public void activeSourceClosedWhenSwitchingSynchronizers() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeTerminalErrorResult()); + + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + AtomicBoolean firstSyncClosed = new AtomicBoolean(false); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(firstSyncResults) { + @Override + public void close() { + firstSyncClosed.set(true); + super.close(); + } + }, + () -> new MockQueuedSynchronizer(secondSyncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for both synchronizers to run (switch happens after the first sends terminal error) + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + + assertTrue(firstSyncClosed.get()); + } + + @Test + public void activeSourceClosedOnShutdown() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + AtomicBoolean syncClosed = new AtomicBoolean(false); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) { + @Override + public void close() { + syncClosed.set(true); + super.close(); + } + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + + assertTrue(syncClosed.get()); + } + + @Test + public void setActiveSourceOnInitializerChecksShutdown() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CountDownLatch initializerStarted = new CountDownLatch(1); + CompletableFuture slowResult = new CompletableFuture<>(); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(() -> { + initializerStarted.countDown(); + // Wait for the future to complete (will be completed by shutdown check) + try { + return slowResult.get(2, TimeUnit.SECONDS); + } catch (Exception e) { + return FDv2SourceResult.changeSet(makeChangeSet(true), false); + } + }) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + + Future startFuture = dataSource.start(); + + // Wait for initializer to start + assertTrue(initializerStarted.await(2, TimeUnit.SECONDS)); + + // Close while the initializer is running + dataSource.close(); + + // Complete the future so initializer can finish + slowResult.complete(FDv2SourceResult.changeSet(makeChangeSet(true), false)); + + // Wait for the start method to complete + startFuture.get(2, TimeUnit.SECONDS); + + // Test passes if we reach here - shutdown handled gracefully + } + + // ============================================================================ + // Synchronizer State Transitions + // ============================================================================ + + @Test + public void blockedSynchronizerSkippedInRotation() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First: terminal error (blocked) + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(makeTerminalErrorResult()); + + // Second: works + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + // Third: works + BlockingQueue thirdSyncResults = new LinkedBlockingQueue<>(); + thirdSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + AtomicInteger firstCallCount = new AtomicInteger(0); + AtomicInteger secondCallCount = new AtomicInteger(0); + AtomicInteger thirdCallCount = new AtomicInteger(0); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + firstCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(firstSyncResults); + }, + () -> { + secondCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(secondSyncResults); + }, + () -> { + thirdCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(thirdSyncResults); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertEquals(1, firstCallCount.get()); // Called once, then blocked + assertTrue(secondCallCount.get() >= 1); // Called + } + + @Test + public void allSynchronizersBlockedReturnsNullAndExits() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(makeTerminalErrorResult()); + return new MockQueuedSynchronizer(results); + }, + () -> { + BlockingQueue results = new LinkedBlockingQueue<>(); + results.add(makeTerminalErrorResult()); + return new MockQueuedSynchronizer(results); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertFalse(dataSource.isInitialized()); + } + + @Test + public void recoveryResetsToFirstAvailableSynchronizer() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First synchronizer: send data, then INTERRUPTED + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeInterruptedResult()); + + // Second synchronizer: send data + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + AtomicInteger firstCallCount = new AtomicInteger(0); + AtomicInteger secondCallCount = new AtomicInteger(0); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + firstCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(firstSyncResults); + }, + () -> { + secondCallCount.incrementAndGet(); + return new MockQueuedSynchronizer(secondSyncResults); + } + ); + + // Short recovery timeout + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for 3 applies with enough time for recovery (2s) + overhead + sink.awaitApplyCount(3, 5, TimeUnit.SECONDS); + + // Should have called first synchronizer again after recovery + assertTrue(firstCallCount.get() >= 2 || secondCallCount.get() >= 1); + } + + @Test + public void fallbackMovesToNextSynchronizer() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First: send INTERRUPTED to trigger fallback + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeInterruptedResult()); + + // Second: works + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue secondCalledQueue = new LinkedBlockingQueue<>(); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(firstSyncResults), + () -> { + secondCalledQueue.offer(true); + return new MockQueuedSynchronizer(secondSyncResults); + } + ); + + // Short fallback timeout + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(3, TimeUnit.SECONDS); + + // Wait for the second synchronizer to be called after fallback timeout + Boolean secondCalled = secondCalledQueue.poll(3, TimeUnit.SECONDS); + assertNotNull("Second synchronizer should be called after fallback", secondCalled); + } + + // ============================================================================ + // Condition Lifecycle + // ============================================================================ + + @Test + public void conditionsClosedAfterSynchronizerLoop() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(makeTerminalErrorResult()); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + dataSource.close(); + + // If conditions weren't closed properly, we might see issues + } + + @Test + public void conditionsInformedOfAllResults() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(makeInterruptedResult()); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 10, 20); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // All results should be processed + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertTrue(sink.getApplyCount() >= 2); + } + + @Test + public void conditionsClosedOnException() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture exceptionFuture = new CompletableFuture<>(); + exceptionFuture.completeExceptionally(new RuntimeException("Error")); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(exceptionFuture), + () -> new MockSynchronizer(CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + )) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Conditions should be closed despite exception + } + + @Test + public void primeSynchronizerHasNoRecoveryCondition() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + // Keep alive + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults), + () -> new MockQueuedSynchronizer(new LinkedBlockingQueue<>()) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Prime synchronizer should not have a recovery condition + // This is tested implicitly by the implementation + } + + @Test + public void nonPrimeSynchronizerHasBothConditions() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + // First: send INTERRUPTED to trigger fallback + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(makeInterruptedResult()); + + // Second: will have both conditions + BlockingQueue secondSyncResults = new LinkedBlockingQueue<>(); + secondSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(firstSyncResults), + () -> new MockQueuedSynchronizer(secondSyncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Non-prime synchronizer should have both fallback and recovery + // This is tested implicitly by the implementation + } + + @Test + public void singleSynchronizerHasNoConditions() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Single synchronizer should have no conditions + // This is tested implicitly by the implementation + } + + @Test + public void conditionFutureNeverCompletesWhenNoConditions() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 1, 2); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Should process both ChangeSet results without condition interruption + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertTrue(sink.getApplyCount() >= 2); + } + + // ============================================================================ + // Data Flow Verification + // ============================================================================ + + @Test + public void changeSetAppliedToDataSourceUpdates() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertEquals(1, sink.getApplyCount()); + assertNotNull(sink.getLastChangeSet()); + } + + @Test + public void multipleChangeSetsAppliedInOrder() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for all 3 ChangeSets to be applied + sink.awaitApplyCount(3, 2, TimeUnit.SECONDS); + + assertEquals(3, sink.getApplyCount()); + } + + @Test + public void selectorNonEmptyCompletesInitialization() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture firstInitializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + ); + + BlockingQueue secondInitializerCalledQueue = new LinkedBlockingQueue<>(); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(firstInitializerFuture), + () -> { + secondInitializerCalledQueue.offer(true); + return new MockInitializer(CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + )); + } + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + )) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + + // Second initializer should not be called since first had non-empty selector + Boolean secondInitializerCalled = secondInitializerCalledQueue.poll(500, TimeUnit.MILLISECONDS); + assertNull("Second initializer should not be called when first returns non-empty selector", secondInitializerCalled); + } + + @Test + public void initializerChangeSetWithoutSelectorCompletesIfLastInitializer() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initializerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initializerFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertEquals(1, sink.getApplyCount()); + // TODO: Verify status updated to VALID when data source status is implemented + } + + @Test + public void synchronizerChangeSetAlwaysCompletesStartFuture() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockSynchronizer(synchronizerFuture) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + } + + // ============================================================================ + // Status Result Handling + // ============================================================================ + + @Test + public void goodbyeStatusHandledGracefully() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.goodbye("server-requested", false)); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for applies to be processed + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + assertTrue(sink.getApplyCount() >= 2); + } + + @Test + public void shutdownStatusExitsImmediately() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + syncResults.add(FDv2SourceResult.shutdown()); + + AtomicBoolean secondSynchronizerCalled = new AtomicBoolean(false); + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults), + () -> { + secondSynchronizerCalled.set(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + // Wait for first synchronizer's apply + sink.awaitApplyCount(1, 2, TimeUnit.SECONDS); + + // Verify the second synchronizer was not called (SHUTDOWN exits immediately) + assertFalse(secondSynchronizerCalled.get()); + } + + @Test + public void fdv1FallbackFlagHonored() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // FDv1 fallback + + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + // TODO: Verify FDv1 fallback behavior when implemented + } + + // ============================================================================ + // Edge Cases and Initialization + // ============================================================================ + + @Test + public void emptyInitializerListSkipsToSynchronizers() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + ImmutableList> initializers = ImmutableList.of(); + + AtomicBoolean synchronizerCalled = new AtomicBoolean(false); + CompletableFuture synchronizerFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), false) + ); + + ImmutableList> synchronizers = ImmutableList.of( + () -> { + synchronizerCalled.set(true); + return new MockSynchronizer(synchronizerFuture); + } + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture = dataSource.start(); + startFuture.get(2, TimeUnit.SECONDS); + + assertTrue(synchronizerCalled.get()); + assertTrue(dataSource.isInitialized()); + } + + @Test + public void startedFlagPreventsMultipleRuns() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + AtomicInteger runCount = new AtomicInteger(0); + + ImmutableList> initializers = ImmutableList.of( + () -> { + runCount.incrementAndGet(); + return new MockInitializer(CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), false) + )); + } + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + Future startFuture1 = dataSource.start(); + Future startFuture2 = dataSource.start(); + Future startFuture3 = dataSource.start(); + + // Wait for all start futures to complete + // The data sources use Future instead of CompletableFuture, so we cannot use CompletableFuture.allOf. + startFuture1.get(2, TimeUnit.SECONDS); + startFuture2.get(2, TimeUnit.SECONDS); + startFuture3.get(2, TimeUnit.SECONDS); + + // Verify initializer was only called once despite multiple start() calls + assertEquals(1, runCount.get()); + } + + @Test + public void startBeforeRunCompletesAllComplete() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + BlockingQueue syncResults = new LinkedBlockingQueue<>(); + syncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + ImmutableList> initializers = ImmutableList.of(); + ImmutableList> synchronizers = ImmutableList.of( + () -> new MockQueuedSynchronizer(syncResults) + ); + + FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); + resourcesToClose.add(dataSource); + + // Call start multiple times before completion + Future future1 = dataSource.start(); + Future future2 = dataSource.start(); + + // Both should complete successfully + future1.get(2, TimeUnit.SECONDS); + future2.get(2, TimeUnit.SECONDS); + + assertTrue(dataSource.isInitialized()); + } + + // ============================================================================ + // Mock Implementations + // ============================================================================ + + private static class MockDataSourceUpdateSink implements DataSourceUpdateSinkV2 { + private final AtomicInteger applyCount = new AtomicInteger(0); + private final AtomicReference> lastChangeSet = new AtomicReference<>(); + private final BlockingQueue applySignals = new LinkedBlockingQueue<>(); + + @Override + public boolean apply(DataStoreTypes.ChangeSet changeSet) { + applyCount.incrementAndGet(); + lastChangeSet.set(changeSet); + applySignals.offer(true); + return true; + } + + @Override + public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo errorInfo) { + // TODO: Track status updates when data source status is fully implemented + } + + @Override + public DataStoreStatusProvider getDataStoreStatusProvider() { + return null; // Not needed for these tests + } + + public int getApplyCount() { + return applyCount.get(); + } + + public DataStoreTypes.ChangeSet getLastChangeSet() { + return lastChangeSet.get(); + } + + public void awaitApplyCount(int expectedCount, long timeout, TimeUnit unit) throws InterruptedException { + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + while (applyCount.get() < expectedCount && System.currentTimeMillis() < deadline) { + long remaining = deadline - System.currentTimeMillis(); + if (remaining > 0) { + applySignals.poll(remaining, TimeUnit.MILLISECONDS); + } + } + } + } + + private static class MockInitializer implements Initializer { + private final CompletableFuture result; + private final ThrowingSupplier supplier; + + public MockInitializer(CompletableFuture result) { + this.result = result; + this.supplier = null; + } + + public MockInitializer(ThrowingSupplier supplier) { + this.result = null; + this.supplier = supplier; + } + + @Override + public CompletableFuture run() { + if (supplier != null) { + CompletableFuture future = new CompletableFuture<>(); + try { + future.complete(supplier.get()); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + return result; + } + + @Override + public void close() { + // Nothing to close + } + } + + private static class MockSynchronizer implements Synchronizer { + private final CompletableFuture result; + private final ThrowingSupplier supplier; + private volatile boolean closed = false; + private volatile boolean resultReturned = false; + + public MockSynchronizer(CompletableFuture result) { + this.result = result; + this.supplier = null; + } + + public MockSynchronizer(ThrowingSupplier supplier) { + this.result = null; + this.supplier = supplier; + } + + @Override + public CompletableFuture next() { + if (closed) { + return CompletableFuture.completedFuture(FDv2SourceResult.shutdown()); + } + if (supplier != null) { + CompletableFuture future = new CompletableFuture<>(); + try { + future.complete(supplier.get()); + } catch (Exception e) { + future.completeExceptionally(e); + } + return future; + } + // Only return the result once, then return a never-completing future + if (!resultReturned) { + resultReturned = true; + return result; + } else { + return new CompletableFuture<>(); // Never completes + } + } + + @Override + public void close() { + closed = true; + } + } + + private static class MockQueuedSynchronizer implements Synchronizer { + private final BlockingQueue results; + private volatile boolean closed = false; + + public MockQueuedSynchronizer(BlockingQueue results) { + this.results = results; + } + + public void addResult(FDv2SourceResult result) { + if (!closed) { + results.add(result); + } + } + + @Override + public CompletableFuture next() { + if (closed) { + return CompletableFuture.completedFuture(FDv2SourceResult.shutdown()); + } + + // Try to get immediately, don't wait + FDv2SourceResult result = results.poll(); + if (result != null) { + return CompletableFuture.completedFuture(result); + } else { + // Queue is empty - return a never-completing future to simulate waiting for more data + return new CompletableFuture<>(); + } + } + + @Override + public void close() { + closed = true; + } + } + + @FunctionalInterface + private interface ThrowingSupplier { + T get() throws Exception; + } +} \ No newline at end of file diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java new file mode 100644 index 0000000..ca19d35 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/SynchronizerStateManagerTest.java @@ -0,0 +1,425 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.datasources.Synchronizer; + +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("javadoc") +public class SynchronizerStateManagerTest extends BaseTest { + + private SynchronizerFactoryWithState createMockFactory() { + FDv2DataSource.DataSourceFactory factory = mock(FDv2DataSource.DataSourceFactory.class); + when(factory.build()).thenReturn(mock(Synchronizer.class)); + return new SynchronizerFactoryWithState(factory); + } + + @Test + public void getNextAvailableSynchronizerReturnsNullWhenEmpty() { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); + + assertNull(result); + } + + @Test + public void getNextAvailableSynchronizerReturnsFirstOnFirstCall() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + synchronizers.add(sync1); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); + + assertSame(sync1, result); + } + + @Test + public void getNextAvailableSynchronizerLoopsThroughAvailable() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // First call returns sync1 + assertSame(sync1, manager.getNextAvailableSynchronizer()); + // Second call returns sync2 + assertSame(sync2, manager.getNextAvailableSynchronizer()); + // Third call returns sync3 + assertSame(sync3, manager.getNextAvailableSynchronizer()); + } + + @Test + public void getNextAvailableSynchronizerWrapsAroundToBeginning() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Get all synchronizers + manager.getNextAvailableSynchronizer(); // sync1 + manager.getNextAvailableSynchronizer(); // sync2 + + // Should wrap around to sync1 + assertSame(sync1, manager.getNextAvailableSynchronizer()); + } + + @Test + public void getNextAvailableSynchronizerSkipsBlockedSynchronizers() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Block sync2 + sync2.block(); + + // First call returns sync1 + assertSame(sync1, manager.getNextAvailableSynchronizer()); + // Second call skips sync2 and returns sync3 + assertSame(sync3, manager.getNextAvailableSynchronizer()); + // Third call wraps and returns sync1 (skips sync2) + assertSame(sync1, manager.getNextAvailableSynchronizer()); + } + + @Test + public void getNextAvailableSynchronizerReturnsNullWhenAllBlocked() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Block all synchronizers + sync1.block(); + sync2.block(); + + SynchronizerFactoryWithState result = manager.getNextAvailableSynchronizer(); + + assertNull(result); + } + + @Test + public void resetSourceIndexResetsToFirstSynchronizer() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Advance to sync3 + manager.getNextAvailableSynchronizer(); // sync1 + manager.getNextAvailableSynchronizer(); // sync2 + manager.getNextAvailableSynchronizer(); // sync3 + + // Reset + manager.resetSourceIndex(); + + // Next call should return sync1 again + assertSame(sync1, manager.getNextAvailableSynchronizer()); + } + + @Test + public void isPrimeSynchronizerReturnsTrueForFirst() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Get first synchronizer + manager.getNextAvailableSynchronizer(); + + assertTrue(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerReturnsFalseForNonFirst() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Get first then second synchronizer + manager.getNextAvailableSynchronizer(); + manager.getNextAvailableSynchronizer(); + + assertFalse(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerReturnsFalseWhenNoSynchronizerSelected() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + synchronizers.add(sync1); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Haven't called getNext yet + assertFalse(manager.isPrimeSynchronizer()); + } + + @Test + public void isPrimeSynchronizerHandlesBlockedFirstSynchronizer() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Block first synchronizer + sync1.block(); + + // Get second synchronizer (which is now the prime) + manager.getNextAvailableSynchronizer(); + + assertTrue(manager.isPrimeSynchronizer()); + } + + @Test + public void getAvailableSynchronizerCountReturnsCorrectCount() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + assertEquals(3, manager.getAvailableSynchronizerCount()); + } + + @Test + public void getAvailableSynchronizerCountUpdatesWhenBlocked() { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + assertEquals(3, manager.getAvailableSynchronizerCount()); + + sync2.block(); + assertEquals(2, manager.getAvailableSynchronizerCount()); + + sync1.block(); + assertEquals(1, manager.getAvailableSynchronizerCount()); + + sync3.block(); + assertEquals(0, manager.getAvailableSynchronizerCount()); + } + + @Test + public void setActiveSourceSetsNewSource() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable source = mock(Closeable.class); + boolean shutdown = manager.setActiveSource(source); + + assertFalse(shutdown); + } + + @Test + public void setActiveSourceClosesPreviousSource() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable firstSource = mock(Closeable.class); + Closeable secondSource = mock(Closeable.class); + + manager.setActiveSource(firstSource); + manager.setActiveSource(secondSource); + + verify(firstSource, times(1)).close(); + } + + @Test + public void setActiveSourceReturnsTrueAfterShutdown() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + manager.close(); + + Closeable source = mock(Closeable.class); + boolean shutdown = manager.setActiveSource(source); + + assertTrue(shutdown); + verify(source, times(1)).close(); + } + + @Test + public void setActiveSourceIgnoresCloseExceptionFromPreviousSource() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable firstSource = mock(Closeable.class); + doThrow(new IOException("test")).when(firstSource).close(); + + Closeable secondSource = mock(Closeable.class); + + manager.setActiveSource(firstSource); + // Should not throw + manager.setActiveSource(secondSource); + } + + @Test + public void shutdownClosesActiveSource() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable source = mock(Closeable.class); + manager.setActiveSource(source); + + manager.close(); + + verify(source, times(1)).close(); + } + + @Test + public void shutdownCanBeCalledMultipleTimes() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable source = mock(Closeable.class); + manager.setActiveSource(source); + + manager.close(); + manager.close(); + manager.close(); + + // Should only close once + verify(source, times(1)).close(); + } + + @Test + public void shutdownIgnoresCloseException() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + Closeable source = mock(Closeable.class); + doThrow(new IOException("test")).when(source).close(); + + manager.setActiveSource(source); + + // Should not throw + manager.close(); + } + + @Test + public void shutdownWithoutActiveSourceDoesNotFail() { + List synchronizers = new ArrayList<>(); + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Should not throw + manager.close(); + } + + @Test + public void integrationTestFullCycle() throws IOException { + List synchronizers = new ArrayList<>(); + SynchronizerFactoryWithState sync1 = createMockFactory(); + SynchronizerFactoryWithState sync2 = createMockFactory(); + SynchronizerFactoryWithState sync3 = createMockFactory(); + synchronizers.add(sync1); + synchronizers.add(sync2); + synchronizers.add(sync3); + + SynchronizerStateManager manager = new SynchronizerStateManager(synchronizers); + + // Initial state + assertEquals(3, manager.getAvailableSynchronizerCount()); + assertFalse(manager.isPrimeSynchronizer()); + + // Get first synchronizer + SynchronizerFactoryWithState first = manager.getNextAvailableSynchronizer(); + assertSame(sync1, first); + assertTrue(manager.isPrimeSynchronizer()); + + // Get second synchronizer + SynchronizerFactoryWithState second = manager.getNextAvailableSynchronizer(); + assertSame(sync2, second); + assertFalse(manager.isPrimeSynchronizer()); + + // Block second + sync2.block(); + assertEquals(2, manager.getAvailableSynchronizerCount()); + + // Get third synchronizer + SynchronizerFactoryWithState third = manager.getNextAvailableSynchronizer(); + assertSame(sync3, third); + assertFalse(manager.isPrimeSynchronizer()); + + // Reset and get first again + manager.resetSourceIndex(); + SynchronizerFactoryWithState firstAgain = manager.getNextAvailableSynchronizer(); + assertSame(sync1, firstAgain); + assertTrue(manager.isPrimeSynchronizer()); + + // Set active source + Closeable source = mock(Closeable.class); + assertFalse(manager.setActiveSource(source)); + + // Shutdown + manager.close(); + verify(source, times(1)).close(); + + // After shutdown, new sources are immediately closed + Closeable newSource = mock(Closeable.class); + assertTrue(manager.setActiveSource(newSource)); + verify(newSource, times(1)).close(); + } +}