diff --git a/lib/sdk/server/Makefile b/lib/sdk/server/Makefile index 91c66dda..af74b32f 100644 --- a/lib/sdk/server/Makefile +++ b/lib/sdk/server/Makefile @@ -33,8 +33,8 @@ run-contract-tests: @curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/v2/downloader/run.sh \ | VERSION=v2 PARAMS="-url http://localhost:$(TEST_SERVICE_PORT) -debug -skip-from=$(SUPPRESSION_FILE) $(TEST_HARNESS_PARAMS_V2)" sh @echo "Running SDK contract test v3..." - @curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/v3.0.0-alpha.3/downloader/run.sh \ - | VERSION=v3.0.0-alpha.3 PARAMS="-url http://localhost:$(TEST_SERVICE_PORT) -debug -stop-service-at-end -skip-from=$(SUPPRESSION_FILE_FDV2) $(TEST_HARNESS_PARAMS_V3)" sh + @curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/v3.0.0-alpha.6/downloader/run.sh \ + | VERSION=v3.0.0-alpha.6 PARAMS="-url http://localhost:$(TEST_SERVICE_PORT) -debug -stop-service-at-end -skip-from=$(SUPPRESSION_FILE_FDV2) $(TEST_HARNESS_PARAMS_V3)" sh contract-tests: build-contract-tests start-contract-test-service-bg run-contract-tests diff --git a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/Representations.java b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/Representations.java index 10bb9741..99112355 100644 --- a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/Representations.java +++ b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/Representations.java @@ -136,6 +136,12 @@ public static class SdkConfigDataSystemParams { SdkConfigDataInitializerParams[] initializers; /** List of synchronizers (matches servicedef DataSystem.Synchronizers). */ SdkConfigSynchronizerParams[] synchronizers; + /** + * Configuration for the FDv1 fallback synchronizer engaged in response to a + * server-directed FDv1 Fallback Directive. Distinct from the FDv2 synchronizer chain + * above; matches servicedef DataSystem.FDv1Fallback. + */ + SdkConfigPollingParams fdv1Fallback; String payloadFilter; } diff --git a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java index 70280ed2..f6bb9d96 100644 --- a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java +++ b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/SdkClientEntity.java @@ -33,8 +33,6 @@ import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder; import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder; import com.launchdarkly.sdk.server.interfaces.BigSegmentStoreStatusProvider; -import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer; -import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder; import com.launchdarkly.sdk.server.datasources.Initializer; import com.launchdarkly.sdk.server.datasources.Synchronizer; @@ -570,20 +568,23 @@ private LDConfig buildSdkConfig(SdkConfigParams params, String tag) { } } - // Configure FDv1 fallback synchronizer (pick first polling, else first synchronizer) - SdkConfigSynchronizerParams fallbackSynchronizer = - selectFallbackSynchronizer(params.dataSystem.synchronizers); - if (fallbackSynchronizer != null) { - // Set global polling endpoints if the fallback synchronizer has polling with custom base URI - if (fallbackSynchronizer.polling != null && - fallbackSynchronizer.polling.baseUri != null) { - endpoints.polling(fallbackSynchronizer.polling.baseUri); + // Configure the FDv1 fallback synchronizer. This is engaged only when the server returns + // the X-LD-FD-Fallback directive; it lives outside the FDv2 Primary/Fallback synchronizer + // chain configured above. The test harness sends this as a dedicated top-level field -- + // do not infer it from the FDv2 synchronizer list. + if (params.dataSystem.fdv1Fallback != null) { + SdkConfigPollingParams fdv1Params = params.dataSystem.fdv1Fallback; + if (fdv1Params.baseUri != null) { + endpoints.polling(fdv1Params.baseUri); } - - // Create and configure FDv1 fallback - ComponentConfigurer fdv1Fallback = - createFDv1FallbackSynchronizer(fallbackSynchronizer); - dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Fallback); + PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource(); + if (fdv1Params.pollIntervalMs != null) { + fdv1Polling.pollInterval(Duration.ofMillis(fdv1Params.pollIntervalMs)); + } + if (params.dataSystem.payloadFilter != null && !params.dataSystem.payloadFilter.isEmpty()) { + fdv1Polling.payloadFilter(params.dataSystem.payloadFilter); + } + dataSystemBuilder.fDv1FallbackSynchronizer(fdv1Polling); } builder.dataSystem(dataSystemBuilder); @@ -625,47 +626,4 @@ private DataSourceBuilder createSynchronizer( return null; } - /** - * Selects the best synchronizer configuration to use for FDv1 fallback. - * Prefers the first polling synchronizer in the list, otherwise the first synchronizer. - */ - private static SdkConfigSynchronizerParams selectFallbackSynchronizer( - SdkConfigSynchronizerParams[] synchronizers) { - if (synchronizers == null || synchronizers.length == 0) { - return null; - } - // Prefer first polling synchronizer (FDv1 fallback is polling-based) - for (SdkConfigSynchronizerParams sync : synchronizers) { - if (sync.polling != null) { - return sync; - } - } - // Otherwise use first synchronizer (streaming; FDv1 will use default polling config) - return synchronizers[0]; - } - - /** - * Creates the FDv1 fallback synchronizer based on the selected synchronizer config. - * FDv1 fallback is always polling-based and uses the global service endpoints configuration. - */ - private static ComponentConfigurer createFDv1FallbackSynchronizer( - SdkConfigSynchronizerParams synchronizer) { - - // FDv1 fallback is always polling-based - PollingDataSourceBuilder fdv1Polling = Components.pollingDataSource(); - - // Configure polling interval if the synchronizer has polling configuration - if (synchronizer.polling != null) { - if (synchronizer.polling.pollIntervalMs != null) { - fdv1Polling.pollInterval(Duration.ofMillis(synchronizer.polling.pollIntervalMs)); - } - // Note: FDv1 polling doesn't support per-source service endpoints override, - // so it will use the global service endpoints configuration (which is set - // by the caller before this method is invoked) - } - // If streaming synchronizer, use default polling interval - // (no additional configuration needed) - - return fdv1Polling; - } } diff --git a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/TestService.java b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/TestService.java index 9f16f387..1355fdd4 100644 --- a/lib/sdk/server/contract-tests/service/src/main/java/sdktest/TestService.java +++ b/lib/sdk/server/contract-tests/service/src/main/java/sdktest/TestService.java @@ -41,7 +41,8 @@ public class TestService { "service-endpoints", "strongly-typed", "tags", - "server-side-polling" + "server-side-polling", + "fdv1-fallback" }; static final Gson gson = new GsonBuilder().serializeNulls().create(); diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index a59f726b..40831147 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -133,8 +133,26 @@ private void run() { return; } + InitializerOutcome initializerOutcome = InitializerOutcome.completed(); if (sourceManager.hasInitializers()) { - runInitializers(); + initializerOutcome = runInitializers(); + } + + // If an initializer signalled FDv1 fallback, switch to the FDv1 synchronizer + // (if configured) or transition to OFF. This takes precedence over the standard + // synchronizer chain -- the FDv2 synchronizers are not given a chance to run. + if (initializerOutcome.fallbackToFDv1) { + if (sourceManager.hasFDv1Fallback()) { + logger.warn("Initializer requested fallback to FDv1; switching to FDv1 fallback synchronizer."); + sourceManager.fdv1Fallback(); + } else { + logger.warn("Initializer requested fallback to FDv1, but no FDv1 fallback synchronizer is configured."); + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.OFF, + initializerOutcome.errorInfo); + startFuture.complete(false); + return; + } } if(!sourceManager.hasAvailableSynchronizers()) { @@ -149,11 +167,13 @@ private void run() { return; } - runSynchronizers(); + boolean haltedByDirective = runSynchronizers(); - // If we had synchronizers, and we ran out of them, and we aren't shutting down, then that was unexpected, - // and we will report it. - maybeReportUnexpectedExhaustion("All data source acquisition methods have been exhausted."); + if (!haltedByDirective) { + // If we had synchronizers, and we ran out of them, and we aren't shutting down, then that was unexpected, + // and we will report it. + maybeReportUnexpectedExhaustion("All data source acquisition methods have been exhausted."); + } // If we had initialized at some point, then the future will already be complete and this will be ignored. startFuture.complete(false); @@ -164,7 +184,18 @@ private void run() { runThread.start(); } - private void runInitializers() { + /** + * Runs the configured initializers in order until one succeeds, the list is exhausted, + * or one signals an FDv1 fallback directive. Returns an {@link InitializerOutcome} + * describing whether the caller should switch to the FDv1 fallback synchronizer. + *

+ * If an initializer's result carries {@link FDv2SourceResult#isFdv1Fallback()}, any + * accompanying payload is applied first so evaluations can serve the server-provided + * data while the FDv1 synchronizer is brought up. When the directive accompanies an + * error result the underlying error is preserved on the returned outcome so the + * caller can surface it on a subsequent OFF status (when no fallback is configured). + */ + private InitializerOutcome runInitializers() { boolean anyDataReceived = false; Initializer initializer = sourceManager.getNextInitializerAndSetActive(); while (initializer != null) { @@ -172,16 +203,25 @@ private void runInitializers() { logger.info("Initializer '{}' is starting.", initializerName); try { try (FDv2SourceResult result = initializer.run().get()) { + DataSourceStatusProvider.ErrorInfo fallbackErrorInfo = null; switch (result.getResultType()) { case CHANGE_SET: dataSourceUpdates.apply(result.getChangeSet()); anyDataReceived = true; logger.info("Initialized via '{}'.", initializerName); if (!result.getChangeSet().getSelector().isEmpty()) { - // We received data with a selector, so we end the initialization process. + // A defined selector marks initialization complete -- match Go/Python/Ruby + // behavior. A selectorless basis is applied so evaluations can serve it, + // and once the initializer chain is fully exhausted that applied data is + // also enough to consider initialization complete (see the post-loop + // block below); but mid-chain we don't yet flip to VALID, so a later + // initializer can still produce a selectorful basis if one is available. dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); startFuture.complete(true); - return; + if (result.isFdv1Fallback()) { + return InitializerOutcome.fallbackToFDv1(null); + } + return InitializerOutcome.completed(); } break; case STATUS: @@ -192,6 +232,7 @@ private void runInitializers() { logger.warn("Initializer '{}' failed: {}", initializerName, detailForError(status.getErrorInfo())); + fallbackErrorInfo = status.getErrorInfo(); // The data source updates handler will filter the state during initializing, but this // will make the error information available. dataSourceUpdates.updateStatus( @@ -208,6 +249,14 @@ private void runInitializers() { } break; } + // FDv1 fallback may ride along on either a successful CHANGE_SET (with no + // selector, so initialization is incomplete) or on a STATUS error result. + // In either case, the SDK must halt the FDv2 chain immediately and switch + // to the FDv1 fallback synchronizer; the eventual VALID status will come + // from the FDv1 synchronizer once it serves a selectorful payload. + if (result.isFdv1Fallback()) { + return InitializerOutcome.fallbackToFDv1(fallbackErrorInfo); + } } } catch (ExecutionException | InterruptedException | CancellationException e) { // The data source updates handler will filter the state during initializing, but this @@ -225,14 +274,39 @@ private void runInitializers() { } initializer = sourceManager.getNextInitializerAndSetActive(); } - // We received data without a selector, and we have exhausted initializers, so we are going to - // consider ourselves initialized. + // No initializer produced a selectorful basis, but at least one initializer applied + // a selectorless basis. Treat that as enough to consider the data system initialized + // now that the entire initializer chain is exhausted -- evaluations can serve the + // applied data, and the synchronizer phase (when configured) will continue from + // there. Without this, an SDK configured with only selectorless initializers and no + // synchronizer would never transition out of INITIALIZING. if (anyDataReceived) { dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); startFuture.complete(true); } - // If no data was received, then it is possible initialization will complete from synchronizers, so we give - // them an opportunity to run before reporting any issues. + return InitializerOutcome.completed(); + } + + /** + * Outcome of {@link #runInitializers()} relaying whether the SDK should perform a + * server-directed FDv1 fallback before the synchronizer phase begins. + */ + private static final class InitializerOutcome { + final boolean fallbackToFDv1; + final DataSourceStatusProvider.ErrorInfo errorInfo; + + private InitializerOutcome(boolean fallbackToFDv1, DataSourceStatusProvider.ErrorInfo errorInfo) { + this.fallbackToFDv1 = fallbackToFDv1; + this.errorInfo = errorInfo; + } + + static InitializerOutcome completed() { + return new InitializerOutcome(false, null); + } + + static InitializerOutcome fallbackToFDv1(DataSourceStatusProvider.ErrorInfo errorInfo) { + return new InitializerOutcome(true, errorInfo); + } } /** @@ -259,7 +333,17 @@ private List getConditions() { return conditionFactories.stream().map(ConditionFactory::build).collect(Collectors.toList()); } - private void runSynchronizers() { + /** + * Runs the configured synchronizers, falling back / recovering as conditions allow, + * until the list is exhausted, the data source is closed, or a server-directed FDv1 + * fallback halts the data system. + * + * @return true when {@code runSynchronizers} halted in response to a server-directed + * FDv1 fallback directive that could not be satisfied (no FDv1 fallback + * synchronizer configured) -- the caller should NOT report exhaustion in + * that case because OFF has already been published with a specific error. + */ + private boolean runSynchronizers() { // When runSynchronizers exists, no matter how it exits, the synchronizerStateManager will be closed. try { Synchronizer synchronizer = sourceManager.getNextAvailableSynchronizerAndSetActive(); @@ -343,7 +427,7 @@ private void runSynchronizers() { ); // We should be overall shutting down. logger.debug("Synchronizer shutdown."); - return; + return false; case TERMINAL_ERROR: maybeLogSynchronizerStatusChange( synchronizer.name(), @@ -365,19 +449,35 @@ private void runSynchronizers() { } break; } - // We have been requested to fall back to FDv1. We handle whatever message was associated, - // close the synchronizer, and then fallback. - // Only trigger fallback if we're not already running the FDv1 fallback synchronizer. - if ( - result.isFdv1Fallback() && - sourceManager.hasFDv1Fallback() && - // This shouldn't happen in practice, an FDv1 source shouldn't request fallback - // to FDv1. But if it does, then we will discard its request. - !sourceManager.isCurrentSynchronizerFDv1Fallback() - ) { + // We have been requested to fall back to FDv1. Block every FDv2 + // synchronizer in one shot via fdv1Fallback() (which also unblocks the + // FDv1 fallback synchronizer, if one is configured). If FDv1 is + // configured we hand off to it; otherwise we halt the data system. + // An FDv1 fallback synchronizer asking to fall back again is ignored + // -- shouldn't happen in practice. + if (result.isFdv1Fallback() + && !sourceManager.isCurrentSynchronizerFDv1Fallback()) { sourceManager.fdv1Fallback(); - logger.info("Falling back to an FDv1 fallback synchronizer."); - running = false; + if (sourceManager.hasFDv1Fallback()) { + logger.info("Falling back to an FDv1 fallback synchronizer."); + running = false; + } else { + // When the directive is signalled but no FDv1 fallback synchronizer + // is configured, halt the data system entirely. Surface OFF with + // the most recent error info (if any) and exit the synchronizer + // loop terminally. + logger.warn( + "Synchronizer '{}' requested FDv1 fallback, but no FDv1 fallback synchronizer is configured; halting the data system.", + synchronizer.name() + ); + DataSourceStatusProvider.ErrorInfo offError = + result.getStatus() != null ? result.getStatus().getErrorInfo() : null; + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.OFF, + offError); + startFuture.complete(false); + return true; + } } } } @@ -408,6 +508,7 @@ private void runSynchronizers() { } finally { sourceManager.close(); } + return false; } private static String detailForError(DataSourceStatusProvider.ErrorInfo errorInfo) { diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java index 99b72c6c..d2015819 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceTest.java @@ -1959,7 +1959,9 @@ public void initializerChangeSetWithoutSelectorCompletesIfLastInitializer() thro Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); - // Expected status: VALID (single initializer without selector completes when it's the last initializer) + // A single initializer applied a selectorless basis and there are no synchronizers, + // so once the initializer chain exhausts the applied data is enough to mark the + // data source VALID and complete startFuture. List statuses = sink.awaitStatuses(1, 2, TimeUnit.SECONDS); assertEquals("Should receive 1 status update", 1, statuses.size()); assertEquals(DataSourceStatusProvider.State.VALID, statuses.get(0)); @@ -2212,8 +2214,6 @@ public void statusTransitionsToValidAfterInitialization() throws Exception { Future startFuture = dataSource.start(); startFuture.get(2, TimeUnit.SECONDS); - // After initializers complete with data (no selector), VALID status is emitted - // Since we initialized successfully and there are no synchronizers, we stay VALID DataSourceStatusProvider.State status = sink.awaitStatus(2, TimeUnit.SECONDS); assertNotNull("Should receive status update", status); assertEquals(DataSourceStatusProvider.State.VALID, status); @@ -2531,27 +2531,42 @@ public void fdv1FallbackWorksAfterInterruption() throws Exception { assertTrue("Should have at least 2 changesets", sink.getApplyCount() >= 2); } + // Synchronizer-phase analogue of the initializer halt path: when a synchronizer signals + // FDv1 fallback but no FDv1 fallback synchronizer is configured, the data system must halt + // (transition to OFF, stop building further synchronizers). @Test - public void fdv1FallbackWithoutConfiguredFallbackIgnoresFlag() throws Exception { + public void fdv1FallbackOnSynchronizerWithoutFDv1ConfiguredHaltsDataSystem() throws Exception { executor = Executors.newScheduledThreadPool(2); MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); ImmutableList> initializers = ImmutableList.of(); - // Synchronizer sends result with FDv1 fallback flag - BlockingQueue fdv2SyncResults = new LinkedBlockingQueue<>(); - fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); - fdv2SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); // FDv1 fallback flag + // First synchronizer sends a normal payload, then a payload with the directive. + BlockingQueue firstSyncResults = new LinkedBlockingQueue<>(); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + firstSyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), true)); + + AtomicInteger firstSyncBuildCount = new AtomicInteger(0); + AtomicBoolean secondSyncCalled = new AtomicBoolean(false); + // Two synchronizers configured; once the directive halts the data system, neither + // should be re-built. The second synchronizer in particular must never run. ImmutableList> synchronizers = ImmutableList.of( - () -> new MockQueuedSynchronizer(fdv2SyncResults) + () -> { + firstSyncBuildCount.incrementAndGet(); + return new MockQueuedSynchronizer(firstSyncResults); + }, + () -> { + secondSyncCalled.set(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } ); - // No FDv1 fallback configured (null) + // No FDv1 fallback configured. FDv2DataSource dataSource = new FDv2DataSource( initializers, synchronizers, - null, // No FDv1 fallback + null, sink, Thread.NORM_PRIORITY, logger, @@ -2561,12 +2576,34 @@ public void fdv1FallbackWithoutConfiguredFallbackIgnoresFlag() throws Exception ); resourcesToClose.add(dataSource); - Future startFuture = dataSource.start(); - startFuture.get(2, TimeUnit.SECONDS); + dataSource.start().get(2, TimeUnit.SECONDS); - // Should receive both changesets even though fallback flag was set + // First payload is applied; the directive-bearing payload is also applied (the SDK + // surfaces the data) but no further synchronizers are dialled. sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); - assertEquals(2, sink.getApplyCount()); + + // Status must end up OFF because the directive could not be satisfied. + // Poll briefly for the OFF state in case the final transition is observed slightly + // after the apply count reaches 2; this keeps the assertion deterministic without + // relying on real-time delays. + DataSourceStatusProvider.State finalState = null; + for (int i = 0; i < 20 && finalState != DataSourceStatusProvider.State.OFF; i++) { + DataSourceStatusProvider.State next = sink.awaitStatus(100, TimeUnit.MILLISECONDS); + if (next != null) { + finalState = next; + } else if (sink.getLastState() == DataSourceStatusProvider.State.OFF) { + finalState = DataSourceStatusProvider.State.OFF; + } + } + assertEquals("Status should be OFF after directive halts the data system", + DataSourceStatusProvider.State.OFF, sink.getLastState()); + + // The second synchronizer must never be built. + assertFalse("Subsequent synchronizers must not run after directive-induced halt", + secondSyncCalled.get()); + // The first synchronizer should also not be re-built (we should stay halted). + assertEquals("First synchronizer should be built exactly once", + 1, firstSyncBuildCount.get()); } @Test @@ -2679,6 +2716,245 @@ public void fdv1FallbackOnlyCalledOncePerDataSource() throws Exception { assertNull("FDv1 fallback should only be called once", secondCall); } + // ============================================================================ + // FDv1 Fallback (Initializer Phase) Tests + // ============================================================================ + + // An initializer that returns a successful payload with the FDv1 fallback flag must + // (1) apply the payload, then (2) hand off directly to the FDv1 fallback synchronizer + // without giving any of the configured FDv2 synchronizers a chance to run. + @Test + public void fdv1FallbackOnInitializerSuccessAppliesPayloadAndSwitches() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + // Initializer returns a payload with a selector AND the FDv1 fallback signal. + CompletableFuture initFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(true), true) + ); + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initFuture) + ); + + AtomicBoolean fdv2SyncCalled = new AtomicBoolean(false); + ImmutableList> synchronizers = ImmutableList.of( + () -> { + fdv2SyncCalled.set(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } + ); + + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + dataSource.start().get(2, TimeUnit.SECONDS); + + // Initializer payload was applied. + sink.awaitApplyCount(1, 2, TimeUnit.SECONDS); + assertTrue("Initializer payload should be applied before fallback", sink.getApplyCount() >= 1); + + // FDv1 fallback was activated. + Boolean fdv1Called = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback should be activated by initializer-phase directive", fdv1Called); + + // FDv2 synchronizer was never asked to run. + assertFalse("FDv2 synchronizers must be skipped after initializer-phase fallback", + fdv2SyncCalled.get()); + } + + // An initializer that fails with the FDv1 fallback flag (e.g. 500 + header) must hand off + // to the FDv1 fallback synchronizer without trying FDv2 synchronizers. + @Test + public void fdv1FallbackOnInitializerErrorSwitchesToFDv1() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initFuture = CompletableFuture.completedFuture( + FDv2SourceResult.terminalError( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, + 500, + "fallback requested", + Instant.now() + ), + true + ) + ); + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initFuture) + ); + + AtomicBoolean fdv2SyncCalled = new AtomicBoolean(false); + ImmutableList> synchronizers = ImmutableList.of( + () -> { + fdv2SyncCalled.set(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } + ); + + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(false), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + dataSource.start().get(2, TimeUnit.SECONDS); + + Boolean fdv1Called = fdv1CalledQueue.poll(2, TimeUnit.SECONDS); + assertNotNull("FDv1 fallback should be activated even when initializer signals error", + fdv1Called); + assertFalse("FDv2 synchronizers must be skipped after initializer-phase fallback", + fdv2SyncCalled.get()); + } + + // When an initializer signals FDv1 fallback but no FDv1 synchronizer is configured, the + // SDK must transition the data source status to OFF -- not stay stuck at INITIALIZING -- + // and surface the underlying initializer error so monitors can see why. + @Test + public void fdv1FallbackOnInitializerWithoutFDv1ConfiguredTransitionsToOff() throws Exception { + executor = Executors.newScheduledThreadPool(1); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + DataSourceStatusProvider.ErrorInfo initError = new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.ERROR_RESPONSE, + 500, + "fallback requested without fallback configured", + Instant.now() + ); + CompletableFuture initFuture = CompletableFuture.completedFuture( + FDv2SourceResult.terminalError(initError, true) + ); + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initFuture) + ); + + AtomicBoolean fdv2SyncCalled = new AtomicBoolean(false); + ImmutableList> synchronizers = ImmutableList.of( + () -> { + fdv2SyncCalled.set(true); + return new MockQueuedSynchronizer(new LinkedBlockingQueue<>()); + } + ); + + // No FDv1 fallback configured. + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + null, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + dataSource.start().get(2, TimeUnit.SECONDS); + + // Status must end up OFF (not INITIALIZING) so callers can observe the terminal state. + assertEquals(DataSourceStatusProvider.State.OFF, sink.getLastState()); + assertNotNull("Initializer error should be preserved on the OFF status", sink.getLastError()); + assertEquals(initError.getKind(), sink.getLastError().getKind()); + assertEquals(initError.getStatusCode(), sink.getLastError().getStatusCode()); + + // FDv2 synchronizers should not run. + assertFalse(fdv2SyncCalled.get()); + } + + // When an initializer returns a successful payload-without-selector AND the FDv1 fallback + // flag, the partial payload should still be applied (so evaluations can serve it) and the + // SDK should move on to the FDv1 synchronizer rather than scanning further FDv2 sources. + // Crucially, a selectorless basis must NOT mark the data source VALID -- that transition + // belongs to the FDv1 synchronizer once it serves a selectorful payload. Match Go/Python/ + // Ruby behavior on initialization gating. + @Test + public void fdv1FallbackOnInitializerSuccessNoSelectorAppliesPayloadAndSwitches() throws Exception { + executor = Executors.newScheduledThreadPool(2); + MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); + + CompletableFuture initFuture = CompletableFuture.completedFuture( + FDv2SourceResult.changeSet(makeChangeSet(false), true) + ); + ImmutableList> initializers = ImmutableList.of( + () -> new MockInitializer(initFuture) + ); + + ImmutableList> synchronizers = ImmutableList.of(); + + // FDv1 sync must serve a selectorful basis to move the data source to VALID. + BlockingQueue fdv1SyncResults = new LinkedBlockingQueue<>(); + fdv1SyncResults.add(FDv2SourceResult.changeSet(makeChangeSet(true), false)); + + BlockingQueue fdv1CalledQueue = new LinkedBlockingQueue<>(); + FDv2DataSource.DataSourceFactory fdv1Fallback = () -> { + fdv1CalledQueue.offer(true); + return new MockQueuedSynchronizer(fdv1SyncResults); + }; + + FDv2DataSource dataSource = new FDv2DataSource( + initializers, + synchronizers, + fdv1Fallback, + sink, + Thread.NORM_PRIORITY, + logger, + executor, + 120, + 300 + ); + resourcesToClose.add(dataSource); + + dataSource.start().get(2, TimeUnit.SECONDS); + + // Both payloads applied: the selectorless initializer basis and the selectorful FDv1 basis. + sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); + assertTrue("Both initializer and FDv1 payloads should have been applied", + sink.getApplyCount() >= 2); + assertNotNull(fdv1CalledQueue.poll(2, TimeUnit.SECONDS)); + + // VALID may only come from the FDv1 synchronizer's selectorful basis. The data source + // is initialized iff that transition has been observed. + assertEquals(DataSourceStatusProvider.State.VALID, sink.getLastState()); + assertTrue("isInitialized() must be true after FDv1 served selectorful basis", + dataSource.isInitialized()); + } + @Test public void orchestrationLogging_warnsWhenNoInitializersOrSynchronizersConfigured() throws Exception { executor = Executors.newScheduledThreadPool(1);