Skip to content

Commit 4bb7b35

Browse files
authored
chore: adds transactional persistent store and recovery (#112)
**Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/main/CONTRIBUTING.md#submitting-pull-requests) - [x] I have validated my changes against all supported platform versions **Related issues** SDK-1610 **Describe the solution you've provided** Ports Dotnet impl to Java <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Implements a write-through architecture and improves persistent store recovery and serialization. > > - Adds `WriteThroughStore` to read from persistence during init, then switch to memory; writes go to both in `READ_WRITE` mode; supports transactional `apply` > - Introduces `PersistentDataStoreConverter` to serialize/deserialize between in-memory `ItemDescriptor` and persistent `SerializedItemDescriptor` > - Extends `PersistentDataStoreWrapper` to use the converter and accept an external `CacheExporter` (via `SettableCache`) to repopulate persistence after outages; retains cache-based fallback > - Updates `FDv2DataSystem` to compose `WriteThroughStore`, build optional persistent store, and set memory store as recovery exporter > - Adds comprehensive tests for converter, wrapper recovery behavior, and write-through store semantics > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit b9f2df3. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 342fc0a commit 4bb7b35

8 files changed

Lines changed: 2441 additions & 34 deletions

File tree

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSystem.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.launchdarkly.sdk.server.subsystems.DataStore;
1616
import com.launchdarkly.sdk.server.subsystems.LoggingConfiguration;
1717
import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration;
18+
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
1819

1920
import java.io.Closeable;
2021
import java.io.IOException;
@@ -91,7 +92,26 @@ static FDv2DataSystem create(
9192
DataStoreUpdatesImpl dataStoreUpdates = new DataStoreUpdatesImpl(
9293
EventBroadcasterImpl.forDataStoreStatus(clientContext.sharedExecutor, logger));
9394

94-
InMemoryDataStore store = new InMemoryDataStore();
95+
DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build();
96+
97+
InMemoryDataStore memoryStore = new InMemoryDataStore();
98+
99+
DataStore persistentStore = null;
100+
if (dataSystemConfiguration.getPersistentStore() != null) {
101+
persistentStore = dataSystemConfiguration.getPersistentStore().build(clientContext.withDataStoreUpdateSink(dataStoreUpdates));
102+
103+
// Configure persistent store to sync from memory store during recovery (ReadWrite mode only)
104+
if (persistentStore != null && dataSystemConfiguration.getPersistentDataStoreMode() == DataSystemConfiguration.DataStoreMode.READ_WRITE) {
105+
if (persistentStore instanceof SettableCache) {
106+
((SettableCache) persistentStore).setCacheExporter(memoryStore);
107+
}
108+
}
109+
}
110+
111+
WriteThroughStore store = new WriteThroughStore(
112+
memoryStore,
113+
persistentStore,
114+
dataSystemConfiguration.getPersistentDataStoreMode());
95115

96116
DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(store, dataStoreUpdates);
97117

@@ -113,7 +133,6 @@ static FDv2DataSystem create(
113133
logger
114134
);
115135

116-
DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build();
117136
SelectorSource selectorSource = new SelectorSourceFacade(store);
118137

119138
DataSourceBuildInputs builderContext = new DataSourceBuildInputs(
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
5+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
6+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
7+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
8+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.SerializedItemDescriptor;
9+
10+
import java.util.AbstractMap;
11+
import java.util.Map;
12+
13+
/**
14+
* Utility for converting between in-memory and serialized persistent data store formats.
15+
*/
16+
final class PersistentDataStoreConverter {
17+
private PersistentDataStoreConverter() {
18+
// Utility class - prevent instantiation
19+
}
20+
21+
/**
22+
* Converts a FullDataSet of ItemDescriptor to SerializedItemDescriptor format.
23+
*
24+
* @param inMemoryData the in-memory data to convert
25+
* @return a FullDataSet in serialized format suitable for persistent stores
26+
*/
27+
static FullDataSet<SerializedItemDescriptor> toSerializedFormat(
28+
FullDataSet<ItemDescriptor> inMemoryData) {
29+
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<SerializedItemDescriptor>>> builder =
30+
ImmutableList.builder();
31+
32+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry : inMemoryData.getData()) {
33+
DataKind kind = kindEntry.getKey();
34+
KeyedItems<ItemDescriptor> items = kindEntry.getValue();
35+
36+
builder.add(new AbstractMap.SimpleEntry<>(
37+
kind,
38+
serializeAll(kind, items)
39+
));
40+
}
41+
42+
return new FullDataSet<>(builder.build());
43+
}
44+
45+
/**
46+
* Serializes a single item descriptor.
47+
*
48+
* @param kind the data kind
49+
* @param itemDesc the item descriptor to serialize
50+
* @return a serialized item descriptor
51+
*/
52+
static SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) {
53+
boolean isDeleted = itemDesc.getItem() == null;
54+
return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc));
55+
}
56+
57+
/**
58+
* Serializes all items of a given DataKind from a KeyedItems collection.
59+
*
60+
* @param kind the data kind
61+
* @param items the items to serialize
62+
* @return keyed items in serialized format
63+
*/
64+
static KeyedItems<SerializedItemDescriptor> serializeAll(
65+
DataKind kind,
66+
KeyedItems<ItemDescriptor> items) {
67+
ImmutableList.Builder<Map.Entry<String, SerializedItemDescriptor>> itemsBuilder =
68+
ImmutableList.builder();
69+
for (Map.Entry<String, ItemDescriptor> e : items.getItems()) {
70+
itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue())));
71+
}
72+
return new KeyedItems<>(itemsBuilder.build());
73+
}
74+
75+
/**
76+
* Deserializes a single item descriptor.
77+
*
78+
* @param kind the data kind
79+
* @param serializedItemDesc the serialized item descriptor
80+
* @return a deserialized item descriptor
81+
*/
82+
static ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) {
83+
if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) {
84+
return ItemDescriptor.deletedItem(serializedItemDesc.getVersion());
85+
}
86+
ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem());
87+
if (serializedItemDesc.getVersion() == 0 ||
88+
serializedItemDesc.getVersion() == deserializedItem.getVersion() ||
89+
deserializedItem.getItem() == null) {
90+
return deserializedItem;
91+
}
92+
// If the store gave us a version number that isn't what was encoded in the object, trust it
93+
return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem());
94+
}
95+
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/PersistentDataStoreWrapper.java

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* <p>
4444
* This class is only constructed by {@link PersistentDataStoreBuilder}.
4545
*/
46-
final class PersistentDataStoreWrapper implements DataStore {
46+
final class PersistentDataStoreWrapper implements DataStore, SettableCache {
4747
private final PersistentDataStore core;
4848
private final LoadingCache<CacheKey, Optional<ItemDescriptor>> itemCache;
4949
private final LoadingCache<DataKind, KeyedItems<ItemDescriptor>> allCache;
@@ -55,6 +55,9 @@ final class PersistentDataStoreWrapper implements DataStore {
5555
private final ListeningExecutorService cacheExecutor;
5656
private final LDLogger logger;
5757

58+
private final Object externalStoreLock = new Object();
59+
private volatile CacheExporter externalCache;
60+
5861
PersistentDataStoreWrapper(
5962
final PersistentDataStore core,
6063
Duration cacheTtl,
@@ -180,7 +183,7 @@ public void init(FullDataSet<ItemDescriptor> allData) {
180183
ImmutableList.Builder<Map.Entry<DataKind, KeyedItems<SerializedItemDescriptor>>> allBuilder = ImmutableList.builder();
181184
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> e0: allData.getData()) {
182185
DataKind kind = e0.getKey();
183-
KeyedItems<SerializedItemDescriptor> items = serializeAll(kind, e0.getValue());
186+
KeyedItems<SerializedItemDescriptor> items = PersistentDataStoreConverter.serializeAll(kind, e0.getValue());
184187
allBuilder.add(new AbstractMap.SimpleEntry<>(kind, items));
185188
}
186189
RuntimeException failure = initCore(new FullDataSet<>(allBuilder.build()));
@@ -260,7 +263,7 @@ public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
260263
synchronized (cachedDataKinds) {
261264
cachedDataKinds.add(kind);
262265
}
263-
SerializedItemDescriptor serializedItem = serialize(kind, item);
266+
SerializedItemDescriptor serializedItem = PersistentDataStoreConverter.serialize(kind, item);
264267
boolean updated = false;
265268
RuntimeException failure = null;
266269
try {
@@ -317,6 +320,24 @@ public boolean isStatusMonitoringEnabled() {
317320
return true;
318321
}
319322

323+
/**
324+
* Sets an external data source for recovery synchronization.
325+
* <p>
326+
* This should be called during initialization if the wrapper is being used
327+
* in a write-through architecture where an external store maintains authoritative data.
328+
* <p>
329+
* When we remove FDv1 support, we should remove this functionality and instead handle it at a higher
330+
* layer.
331+
*
332+
* @param externalDataSource The external data source to sync from during recovery
333+
*/
334+
@Override
335+
public void setCacheExporter(CacheExporter externalDataSource) {
336+
synchronized (externalStoreLock) {
337+
externalCache = externalDataSource;
338+
}
339+
}
340+
320341
@Override
321342
public CacheStats getCacheStats() {
322343
if (itemCache == null || allCache == null) {
@@ -335,7 +356,7 @@ public CacheStats getCacheStats() {
335356

336357
private ItemDescriptor getAndDeserializeItem(DataKind kind, String key) {
337358
SerializedItemDescriptor maybeSerializedItem = core.get(kind, key);
338-
return maybeSerializedItem == null ? null : deserialize(kind, maybeSerializedItem);
359+
return maybeSerializedItem == null ? null : PersistentDataStoreConverter.deserialize(kind, maybeSerializedItem);
339360
}
340361

341362
private KeyedItems<ItemDescriptor> getAllAndDeserialize(DataKind kind) {
@@ -345,36 +366,11 @@ private KeyedItems<ItemDescriptor> getAllAndDeserialize(DataKind kind) {
345366
}
346367
ImmutableList.Builder<Map.Entry<String, ItemDescriptor>> b = ImmutableList.builder();
347368
for (Map.Entry<String, SerializedItemDescriptor> e: allItems.getItems()) {
348-
b.add(new AbstractMap.SimpleEntry<>(e.getKey(), deserialize(kind, e.getValue())));
369+
b.add(new AbstractMap.SimpleEntry<>(e.getKey(), PersistentDataStoreConverter.deserialize(kind, e.getValue())));
349370
}
350371
return new KeyedItems<>(b.build());
351372
}
352373

353-
private SerializedItemDescriptor serialize(DataKind kind, ItemDescriptor itemDesc) {
354-
boolean isDeleted = itemDesc.getItem() == null;
355-
return new SerializedItemDescriptor(itemDesc.getVersion(), isDeleted, kind.serialize(itemDesc));
356-
}
357-
358-
private KeyedItems<SerializedItemDescriptor> serializeAll(DataKind kind, KeyedItems<ItemDescriptor> items) {
359-
ImmutableList.Builder<Map.Entry<String, SerializedItemDescriptor>> itemsBuilder = ImmutableList.builder();
360-
for (Map.Entry<String, ItemDescriptor> e: items.getItems()) {
361-
itemsBuilder.add(new AbstractMap.SimpleEntry<>(e.getKey(), serialize(kind, e.getValue())));
362-
}
363-
return new KeyedItems<>(itemsBuilder.build());
364-
}
365-
366-
private ItemDescriptor deserialize(DataKind kind, SerializedItemDescriptor serializedItemDesc) {
367-
if (serializedItemDesc.isDeleted() || serializedItemDesc.getSerializedItem() == null) {
368-
return ItemDescriptor.deletedItem(serializedItemDesc.getVersion());
369-
}
370-
ItemDescriptor deserializedItem = kind.deserialize(serializedItemDesc.getSerializedItem());
371-
if (serializedItemDesc.getVersion() == 0 || serializedItemDesc.getVersion() == deserializedItem.getVersion()
372-
|| deserializedItem.getItem() == null) {
373-
return deserializedItem;
374-
}
375-
// If the store gave us a version number that isn't what was encoded in the object, trust it
376-
return new ItemDescriptor(serializedItemDesc.getVersion(), deserializedItem.getItem());
377-
}
378374

379375
private KeyedItems<ItemDescriptor> updateSingleItem(KeyedItems<ItemDescriptor> items, String key, ItemDescriptor item) {
380376
// This is somewhat inefficient but it's preferable to use immutable data structures in the cache.
@@ -401,7 +397,47 @@ private boolean pollAvailabilityAfterOutage() {
401397
if (!core.isStoreAvailable()) {
402398
return false;
403399
}
404-
400+
401+
CacheExporter externalCacheSnapshot;
402+
synchronized (externalStoreLock) {
403+
externalCacheSnapshot = externalCache;
404+
}
405+
406+
// If we have an external data source (e.g., WriteThroughStore's memory store) that is initialized,
407+
// use that as the authoritative source. Otherwise, fall back to our internal cache if it's configured
408+
// to cache indefinitely.
409+
if (externalCacheSnapshot != null) {
410+
if (externalCacheSnapshot.isInitialized()) {
411+
try {
412+
FullDataSet<ItemDescriptor> externalData = externalCacheSnapshot.exportAll();
413+
FullDataSet<SerializedItemDescriptor> serializedData =
414+
PersistentDataStoreConverter.toSerializedFormat(externalData);
415+
RuntimeException e = initCore(serializedData);
416+
417+
if (e == null) {
418+
logger.warn("Successfully updated persistent store from external data source");
419+
} else {
420+
// We failed to write the data to the underlying store. In this case, we should not
421+
// return to a recovered state, but just try this all again next time the poll task runs.
422+
logger.error("Tried to write external data to persistent store after outage, but failed: {}",
423+
LogValues.exceptionSummary(e));
424+
logger.debug(LogValues.exceptionTrace(e));
425+
return false;
426+
}
427+
} catch (Exception ex) {
428+
// If we can't export from the external source, don't recover yet
429+
logger.error("Failed to export data from external source during persistent store recovery: {}",
430+
LogValues.exceptionSummary(ex));
431+
logger.debug(LogValues.exceptionTrace(ex));
432+
return false;
433+
}
434+
435+
return true;
436+
}
437+
}
438+
439+
// Fall back to cache-based recovery if external store is not available/initialized
440+
// and we're in infinite cache mode
405441
if (cacheIndefinitely && allCache != null) {
406442
// If we're in infinite cache mode, then we can assume the cache has a full set of current
407443
// flag data (since presumably the data source has still been running) and we can just
@@ -414,7 +450,7 @@ private boolean pollAvailabilityAfterOutage() {
414450
for (DataKind kind: allKinds) {
415451
KeyedItems<ItemDescriptor> items = allCache.getIfPresent(kind);
416452
if (items != null) {
417-
builder.add(new AbstractMap.SimpleEntry<>(kind, serializeAll(kind, items)));
453+
builder.add(new AbstractMap.SimpleEntry<>(kind, PersistentDataStoreConverter.serializeAll(kind, items)));
418454
}
419455
}
420456
RuntimeException e = initCore(new FullDataSet<>(builder.build()));
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
/**
4+
* Optional interface for data stores that can accept a cache exporter
5+
* for recovery synchronization.
6+
* <p>
7+
* This interface is used in write-through architectures where a persistent store
8+
* may fail temporarily. When the persistent store recovers, it can sync data from
9+
* an external authoritative source (like an in-memory store) rather than relying
10+
* solely on its internal cache.
11+
* <p>
12+
* In the long-term, internal caching should be removed from store implementations and managed centrally.
13+
* <p>
14+
* This is currently for internal implementations only.
15+
*/
16+
interface SettableCache {
17+
/**
18+
* Sets an external cache exporter for recovery synchronization.
19+
* <p>
20+
* This should be called during initialization if the data store is being used
21+
* in a write-through architecture where an external store maintains authoritative data.
22+
* When the persistent store recovers from an outage, it will export data from this
23+
* external source and write it to the underlying persistent storage.
24+
*
25+
* @param externalDataSource an external cache to sync from during recovery
26+
*/
27+
void setCacheExporter(CacheExporter externalDataSource);
28+
}

0 commit comments

Comments
 (0)