Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
08abc7e
chore: Pipe headers through data sources.
kinyoklion Jan 21, 2026
bd15015
Merge branch 'main' into rlamb/connect-headers-to-data-sources
kinyoklion Jan 21, 2026
b20c621
Cleanup
kinyoklion Jan 21, 2026
b46ac5e
chore: Add fallback and recovery support for FDv2.
kinyoklion Jan 23, 2026
fba5aa2
Extract conditions from FDv2DataSource.
kinyoklion Jan 23, 2026
472a1d2
Extract synchronizer state management from the FDv2DataSource.
kinyoklion Jan 23, 2026
0c150a1
Add recovery condition tests.
kinyoklion Jan 23, 2026
5888761
Merge branch 'main' into rlamb/fallback-and-recovery
kinyoklion Jan 26, 2026
9e79c00
Merge branch 'rlamb/fallback-and-recovery' of github.com:launchdarkly…
kinyoklion Jan 26, 2026
33fa6c5
Closeable synchronizer state manager.
kinyoklion Jan 26, 2026
ec0d0ec
More clean shutdown model.
kinyoklion Jan 26, 2026
5b62238
SynchronizerStateManager tests.
kinyoklion Jan 26, 2026
53ea319
FDv2DataSource tests.
kinyoklion Jan 26, 2026
147556d
Add documentation to long-running test.
kinyoklion Jan 26, 2026
321fb4f
Fix test expectations.
kinyoklion Jan 26, 2026
1234bc9
Remove un-needed synchronizer.
kinyoklion Jan 27, 2026
4fa930a
chore: adds conditional persistence propagation
tanderson-ld Jan 27, 2026
28bb3ca
Merge remote-tracking branch 'origin/main' into ta/SDK-1622/persisten…
tanderson-ld Jan 27, 2026
10cab33
Correct lock on getNextAvailableSynchronizer.
kinyoklion Jan 27, 2026
382246d
PR Feedback.
kinyoklion Jan 27, 2026
e6b032e
Correct prime synchronizer logic.
kinyoklion Jan 27, 2026
6e14b7f
fixing volatile issue
tanderson-ld Jan 27, 2026
e507aeb
Merge remote-tracking branch 'origin/rlamb/fallback-and-recovery' int…
tanderson-ld Jan 27, 2026
996036d
chore: handles offline mode, no initializers nor synchronizers
tanderson-ld Jan 28, 2026
8dfeadb
Merge remote-tracking branch 'origin' into ta/SDK-1611/daemon-mode-of…
tanderson-ld Jan 29, 2026
14e4f25
Merge remote-tracking branch 'origin/main' into ta/SDK-1611/daemon-mo…
tanderson-ld Jan 30, 2026
f65ddd4
bot review
tanderson-ld Jan 30, 2026
6ae877b
bot review
tanderson-ld Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static FullDataSet<ItemDescriptor> sortAllCollections(FullDataSet<ItemDes
DataKind kind = entry.getKey();
builder.put(kind, sortCollection(kind, entry.getValue()));
}
return new FullDataSet<>(builder.build().entrySet());
return new FullDataSet<>(builder.build().entrySet(), allData.shouldPersist());
}

/**
Expand All @@ -148,7 +148,7 @@ public static ChangeSet<ItemDescriptor> sortChangeset(ChangeSet<ItemDescriptor>
DataKind kind = entry.getKey();
builder.put(kind, sortCollection(kind, entry.getValue()));
}
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId());
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId(), inSet.shouldPersist());
}

private static KeyedItems<ItemDescriptor> sortCollection(DataKind kind, KeyedItems<ItemDescriptor> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.launchdarkly.sdk.server.DataModel.VersionedData;
import com.launchdarkly.sdk.server.DataModel.WeightedVariation;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
import com.launchdarkly.sdk.server.subsystems.SerializationException;
Expand Down Expand Up @@ -106,7 +105,7 @@ static VersionedData deserializeFromJsonReader(DataKind kind, JsonReader jr) thr
* @param jr the JSON reader
* @return the deserialized data
*/
static FullDataSet<ItemDescriptor> parseFullDataSet(JsonReader jr) throws SerializationException {
static Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> parseFullDataSet(JsonReader jr) throws SerializationException {
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> flags = ImmutableList.builder();
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> segments = ImmutableList.builder();

Expand Down Expand Up @@ -141,10 +140,10 @@ static FullDataSet<ItemDescriptor> parseFullDataSet(JsonReader jr) throws Serial
}
jr.endObject();

return new FullDataSet<ItemDescriptor>(ImmutableMap.of(
return ImmutableMap.of(
FEATURES, new KeyedItems<>(flags.build()),
SEGMENTS, new KeyedItems<>(segments.build())
).entrySet());
).entrySet();
} catch (IOException e) {
throw new SerializationException(e);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ private boolean applyToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
}

private boolean applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> unsortedChangeset) {
// Convert ChangeSet to FullDataSet for legacy init path
return init(new FullDataSet<>(unsortedChangeset.getData()));
// Convert ChangeSet to FullDataSet for legacy init path, preserving shouldPersist flag
return init(new FullDataSet<>(unsortedChangeset.getData(), unsortedChangeset.shouldPersist()));
}

private boolean applyPartialChangeSetToLegacyStore(ChangeSet<ItemDescriptor> changeSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public FullDataSet<ItemDescriptor> getAllData(boolean returnDataEvenIfCached)
}

JsonReader jr = new JsonReader(response.body().charStream());
return parseFullDataSet(jr);
// Polling data from LaunchDarkly should be persisted
return new FullDataSet<>(parseFullDataSet(jr), true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ private FDv2ChangeSetTranslator() {
* @param changeset the FDv2 changeset to convert
* @param logger logger for diagnostic messages
* @param environmentId the environment ID to include in the changeset (may be null)
* @param shouldPersist true if the data should be persisted to persistent stores, false otherwise
* @return a DataStoreTypes.ChangeSet containing the converted data
* @throws IllegalArgumentException if the changeset type is unknown
*/
public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
FDv2ChangeSet changeset,
LDLogger logger,
String environmentId) {
String environmentId,
boolean shouldPersist) {
ChangeSetType changeSetType;
switch (changeset.getType()) {
case FULL:
Expand Down Expand Up @@ -103,7 +105,8 @@ public static DataStoreTypes.ChangeSet<ItemDescriptor> toChangeSet(
changeSetType,
changeset.getSelector(),
dataBuilder.build(),
environmentId);
environmentId,
shouldPersist);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public FDv2DataSource(
}

private void run() {
if (initializers.isEmpty() && synchronizerStateManager.getAvailableSynchronizerCount() == 0) {
logger.info("LaunchDarkly client will not connect to Launchdarkly for feature flag data due to no initializers or synchronizers");
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}

Thread runThread = new Thread(() -> {
if (!initializers.isEmpty()) {
runInitializers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.launchdarkly.sdk.server.subsystems.DataStore;
import com.launchdarkly.sdk.server.subsystems.LoggingConfiguration;
import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration;
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -154,14 +153,19 @@ static FDv2DataSystem create(
.map(synchronizer -> new FactoryWrapper<>(synchronizer, builderContext))
.collect(ImmutableList.toImmutableList());

DataSource dataSource = new FDv2DataSource(
initializerFactories,
synchronizerFactories,
dataSourceUpdates,
config.threadPriority,
clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME),
clientContext.sharedExecutor
);
final DataSource dataSource;
Comment thread
cursor[bot] marked this conversation as resolved.
if (config.offline) {
dataSource = Components.externalUpdatesOnly().build(clientContext.withDataSourceUpdateSink(dataSourceUpdates));
} else {
dataSource = new FDv2DataSource(
initializerFactories,
synchronizerFactories,
dataSourceUpdates,
config.threadPriority,
clientContext.getBaseLogger().subLogger(Loggers.DATA_SOURCE_LOGGER_NAME),
clientContext.sharedExecutor
);
}
DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl(
dataSourceStatusBroadcaster,
dataSourceUpdates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ class InMemoryDataStore implements DataStore, TransactionalDataStore, CacheExpor
private Object writeLock = new Object();
private final Object selectorLock = new Object();
private volatile Selector selector = Selector.EMPTY;
private volatile boolean shouldPersist = false;

@Override
public void init(FullDataSet<ItemDescriptor> allData) {
applyFullPayload(allData.getData(), null, Selector.EMPTY);
applyFullPayload(allData.getData(), null, Selector.EMPTY, allData.shouldPersist());
}

@Override
public ItemDescriptor get(DataKind kind, String key) {
Map<String, ItemDescriptor> items = allData.get(kind);
Map<String, ItemDescriptor> items = this.allData.get(kind);
if (items == null) {
return null;
}
Expand All @@ -49,7 +50,7 @@ public ItemDescriptor get(DataKind kind, String key) {

@Override
public KeyedItems<ItemDescriptor> getAll(DataKind kind) {
Map<String, ItemDescriptor> items = allData.get(kind);
Map<String, ItemDescriptor> items = this.allData.get(kind);
if (items == null) {
return new KeyedItems<>(null);
}
Expand All @@ -58,7 +59,7 @@ public KeyedItems<ItemDescriptor> getAll(DataKind kind) {

@Override
public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
synchronized (writeLock) {
synchronized (this.writeLock) {
Map<String, ItemDescriptor> existingItems = this.allData.get(kind);
ItemDescriptor oldItem = null;
if (existingItems != null) {
Expand Down Expand Up @@ -97,7 +98,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {

@Override
public boolean isInitialized() {
return initialized;
return this.initialized;
}

@Override
Expand All @@ -124,10 +125,10 @@ public void close() throws IOException {
public void apply(ChangeSet<ItemDescriptor> changeSet) {
switch (changeSet.getType()) {
case Full:
applyFullPayload(changeSet.getData(), changeSet.getEnvironmentId(), changeSet.getSelector());
applyFullPayload(changeSet.getData(), changeSet.getEnvironmentId(), changeSet.getSelector(), changeSet.shouldPersist());
break;
case Partial:
applyPartialData(changeSet.getData(), changeSet.getSelector());
applyPartialData(changeSet.getData(), changeSet.getSelector(), changeSet.shouldPersist());
break;
case None:
break;
Expand All @@ -140,20 +141,20 @@ public void apply(ChangeSet<ItemDescriptor> changeSet) {

@Override
public Selector getSelector() {
synchronized (selectorLock) {
return selector;
synchronized (this.selectorLock) {
return this.selector;
}
}

private void setSelector(Selector newSelector) {
synchronized (selectorLock) {
selector = newSelector;
synchronized (this.selectorLock) {
this.selector = newSelector;
}
}

private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> data,
Selector selector) {
synchronized (writeLock) {
Selector selector, boolean shouldPersist) {
synchronized (this.writeLock) {
// Build the complete updated dictionary before assigning to Items for transactional update
ImmutableMap.Builder<DataKind, Map<String, ItemDescriptor>> itemsBuilder = ImmutableMap.builder();

Expand All @@ -164,7 +165,7 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
}

// Add all existing kinds that are NOT being updated
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> existingEntry : allData.entrySet()) {
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> existingEntry : this.allData.entrySet()) {
if (!updatedKinds.contains(existingEntry.getKey())) {
itemsBuilder.put(existingEntry.getKey(), existingEntry.getValue());
}
Expand All @@ -176,7 +177,7 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
// Use HashMap to allow overwriting, then convert to ImmutableMap
Map<String, ItemDescriptor> kindMap = new HashMap<>();

Map<String, ItemDescriptor> itemsOfKind = allData.get(kind);
Map<String, ItemDescriptor> itemsOfKind = this.allData.get(kind);
if (itemsOfKind != null) {
kindMap.putAll(itemsOfKind);
}
Expand All @@ -189,13 +190,14 @@ private void applyPartialData(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri
itemsBuilder.put(kind, ImmutableMap.copyOf(kindMap));
}

allData = itemsBuilder.build();
this.allData = itemsBuilder.build();
this.shouldPersist = shouldPersist;
setSelector(selector);
}
}

private void applyFullPayload(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> data,
String environmentId, Selector selector) {
String environmentId, Selector selector, boolean shouldPersist) {
ImmutableMap.Builder<DataKind, Map<String, ItemDescriptor>> itemsBuilder = ImmutableMap.builder();

for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry : data) {
Expand All @@ -208,26 +210,28 @@ private void applyFullPayload(Iterable<Map.Entry<DataKind, KeyedItems<ItemDescri

ImmutableMap<DataKind, Map<String, ItemDescriptor>> newItems = itemsBuilder.build();

synchronized (writeLock) {
allData = newItems;
initialized = true;
synchronized (this.writeLock) {
this.allData = newItems;
this.initialized = true;
this.shouldPersist = shouldPersist;
setSelector(selector);
}
}

@Override
public FullDataSet<ItemDescriptor> exportAll() {
synchronized (writeLock) {
synchronized (this.writeLock) {
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<ItemDescriptor>>> builder = ImmutableList.builder();

for (Map.Entry<DataKind, Map<String, ItemDescriptor>> kindEntry : allData.entrySet()) {
for (Map.Entry<DataKind, Map<String, ItemDescriptor>> kindEntry : this.allData.entrySet()) {
builder.add(new AbstractMap.SimpleEntry<>(
kindEntry.getKey(),
new KeyedItems<>(ImmutableList.copyOf(kindEntry.getValue().entrySet()))
));
}

return new FullDataSet<>(builder.build());
// Preserve the shouldPersist value that was set when data was provided to this store
return new FullDataSet<>(builder.build(), this.shouldPersist);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static FullDataSet<SerializedItemDescriptor> toSerializedFormat(
));
}

return new FullDataSet<>(builder.build());
// Preserve shouldPersist flag when converting formats
return new FullDataSet<>(builder.build(), inMemoryData.shouldPersist());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void init(FullDataSet<ItemDescriptor> allData) {
KeyedItems<SerializedItemDescriptor> items = PersistentDataStoreConverter.serializeAll(kind, e0.getValue());
allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items));
}
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build()));
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build(), allData.shouldPersist()));
if (itemCache != null && allCache != null) {
itemCache.invalidateAll();
allCache.invalidateAll();
Expand Down Expand Up @@ -410,6 +410,12 @@ private boolean pollAvailabilityAfterOutage() {
if (externalCacheSnapshot.isInitialized()) {
try {
FullDataSet<ItemDescriptor> externalData = externalCacheSnapshot.exportAll();

if (!externalData.shouldPersist()) {
logger.debug("Skipping persistence of non-authoritative data (shouldPersist=false) during recovery");
return true; // Recovery succeeded, but we didn't persist
}

FullDataSet<SerializedItemDescriptor> serializedData =
PersistentDataStoreConverter.toSerializedFormat(externalData);
RuntimeException e = initCore(serializedData);
Expand Down Expand Up @@ -453,7 +459,8 @@ private boolean pollAvailabilityAfterOutage() {
builder.add(new AbstractMap.SimpleEntry<>(kind, PersistentDataStoreConverter.serializeAll(kind, items)));
}
}
RuntimeException e = initCore(new FullDataSet<>(builder.build()));
// any data that this PersistentDataStoreWrapper contains has already passed the shouldPersist check
RuntimeException e = initCore(new FullDataSet<>(builder.build(), true));
if (e == null) {
logger.warn("Successfully updated persistent store from cached data");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
new DataStoreTypes.ChangeSet<>(DataStoreTypes.ChangeSetType.None,
Selector.EMPTY,
null,
null // Header derived values will have been handled on initial response.
null, // Header derived values will have been handled on initial response.
true // Polling data from LaunchDarkly should be persisted
),
// Headers would have been processed from the initial response.
false);
Expand All @@ -113,7 +114,8 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(
((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(),
logger,
environmentId
environmentId,
true // Polling data from LaunchDarkly should be persisted
);
return FDv2SourceResult.changeSet(converted, fdv1Fallback);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ static PutData parsePutData(JsonReader jr) {
path = jr.nextString();
break;
case "data":
data = parseFullDataSet(jr);
// Streaming data from LaunchDarkly should be persisted
data = new FullDataSet<>(parseFullDataSet(jr), true);
break;
default:
jr.skipValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ private void handleMessage(MessageEvent event) {
FDv2ChangeSetTranslator.toChangeSet(
changeset.getChangeset(),
logger,
event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()));
event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()),
true);
result = FDv2SourceResult.changeSet(converted, getFallback(event));
} catch (Exception e) {
logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e));
Expand Down
Loading