Skip to content

Commit 3042ad4

Browse files
chore: Connect FDv2 data system. (#108)
- Connects the FDv2 Data System configuration to the implementation. - Moves away from IComponentConfigurer to a specific builder interface for FDv2 Data Sources. - Updates initialization to support both initializers/synchronizers instead of data sources. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Integrates the FDv2 data system end-to-end and replaces generic configurers with specific builders. > > - New `DataSystemComponents` exposes builders for `pollingInitializer`, `pollingSynchronizer`, and `streamingSynchronizer`; retains FDv1 polling fallback > - Implemented `FDv2DataSystem` to construct store, broadcasters, `DataSourceUpdatesImpl`, and build/run FDv2 initializers/synchronizers via `DataSourceBuildInputs` > - Refactored `FDv2DataSource` to use `DataSourceUpdateSinkV2`, apply `CHANGE_SET`s, and manage initializer/synchronizer lifecycles > - Added `SelectorSourceFacade` to fetch selectors from the transactional store > - Protocol change: `DefaultFDv2Requestor` and `StreamingSynchronizerImpl` now send selector via `basis` query param (replacing version/state) > - Introduced `DataSourceBuilder` and `DataSourceBuildInputs`; updated `DataSystemBuilder`, `DataSystemConfiguration`, and `DataSystemModes` to use typed builders; removed old `integrations.DataSystemComponents` > - Tests updated to new APIs and `basis` semantics > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 83b4217. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Todd Anderson <tanderson@launchdarkly.com>
1 parent 211783a commit 3042ad4

18 files changed

Lines changed: 674 additions & 335 deletions
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package com.launchdarkly.sdk.server;
2+
3+
import com.launchdarkly.sdk.server.datasources.Initializer;
4+
import com.launchdarkly.sdk.server.datasources.Synchronizer;
5+
import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
6+
import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
7+
import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
8+
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
9+
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
10+
import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs;
11+
12+
import java.net.URI;
13+
14+
import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties;
15+
16+
/**
17+
* Components for use with the data system.
18+
* <p>
19+
* This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
20+
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
21+
* </p>
22+
*/
23+
public final class DataSystemComponents {
24+
25+
static class FDv2PollingInitializerBuilderImpl extends FDv2PollingInitializerBuilder {
26+
@Override
27+
public Initializer build(DataSourceBuildInputs context) {
28+
ServiceEndpoints endpoints = serviceEndpointsOverride != null
29+
? serviceEndpointsOverride
30+
: context.getServiceEndpoints();
31+
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
32+
endpoints.getPollingBaseUri(),
33+
StandardEndpoints.DEFAULT_POLLING_BASE_URI,
34+
"Polling",
35+
context.getBaseLogger());
36+
37+
DefaultFDv2Requestor requestor = new DefaultFDv2Requestor(
38+
toHttpProperties(context.getHttp()),
39+
configuredBaseUri,
40+
StandardEndpoints.FDV2_POLLING_REQUEST_PATH,
41+
context.getBaseLogger());
42+
43+
return new PollingInitializerImpl(
44+
requestor,
45+
context.getBaseLogger(),
46+
context.getSelectorSource()
47+
);
48+
}
49+
}
50+
51+
static class FDv2PollingSynchronizerBuilderImpl extends FDv2PollingSynchronizerBuilder {
52+
@Override
53+
public Synchronizer build(DataSourceBuildInputs context) {
54+
ServiceEndpoints endpoints = serviceEndpointsOverride != null
55+
? serviceEndpointsOverride
56+
: context.getServiceEndpoints();
57+
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
58+
endpoints.getPollingBaseUri(),
59+
StandardEndpoints.DEFAULT_POLLING_BASE_URI,
60+
"Polling",
61+
context.getBaseLogger());
62+
63+
DefaultFDv2Requestor requestor = new DefaultFDv2Requestor(
64+
toHttpProperties(context.getHttp()),
65+
configuredBaseUri,
66+
StandardEndpoints.FDV2_POLLING_REQUEST_PATH,
67+
context.getBaseLogger());
68+
69+
return new PollingSynchronizerImpl(
70+
requestor,
71+
context.getBaseLogger(),
72+
context.getSelectorSource(),
73+
context.getSharedExecutor(),
74+
pollInterval
75+
);
76+
}
77+
}
78+
79+
static class FDv2StreamingSynchronizerBuilderImpl extends FDv2StreamingSynchronizerBuilder {
80+
@Override
81+
public Synchronizer build(DataSourceBuildInputs context) {
82+
ServiceEndpoints endpoints = serviceEndpointsOverride != null
83+
? serviceEndpointsOverride
84+
: context.getServiceEndpoints();
85+
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
86+
endpoints.getStreamingBaseUri(),
87+
StandardEndpoints.DEFAULT_STREAMING_BASE_URI,
88+
"Streaming",
89+
context.getBaseLogger());
90+
91+
return new StreamingSynchronizerImpl(
92+
toHttpProperties(context.getHttp()),
93+
configuredBaseUri,
94+
StandardEndpoints.FDV2_STREAMING_REQUEST_PATH,
95+
context.getBaseLogger(),
96+
context.getSelectorSource(),
97+
null,
98+
initialReconnectDelay
99+
);
100+
}
101+
}
102+
103+
private DataSystemComponents() {}
104+
105+
/**
106+
* Get a builder for a polling initializer.
107+
*
108+
* @return the polling initializer builder
109+
*/
110+
public static FDv2PollingInitializerBuilder pollingInitializer() {
111+
return new FDv2PollingInitializerBuilderImpl();
112+
}
113+
114+
/**
115+
* Get a builder for a polling synchronizer.
116+
*
117+
* @return the polling synchronizer builder
118+
*/
119+
public static FDv2PollingSynchronizerBuilder pollingSynchronizer() {
120+
return new FDv2PollingSynchronizerBuilderImpl();
121+
}
122+
123+
/**
124+
* Get a builder for a streaming synchronizer.
125+
*
126+
* @return the streaming synchronizer builder
127+
*/
128+
public static FDv2StreamingSynchronizerBuilder streamingSynchronizer() {
129+
return new FDv2StreamingSynchronizerBuilderImpl();
130+
}
131+
132+
/**
133+
* Get a builder for a FDv1 compatible polling data source.
134+
* <p>
135+
* This is intended for use as a fallback.
136+
* </p>
137+
*
138+
* @return the FDv1 compatible polling data source builder
139+
*/
140+
public static PollingDataSourceBuilder fDv1Polling() {
141+
return Components.pollingDataSource();
142+
}
143+
}
144+

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DefaultFDv2Requestor.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
* Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol.
3232
*/
3333
public class DefaultFDv2Requestor implements FDv2Requestor, Closeable {
34-
private static final String VERSION_QUERY_PARAM = "version";
35-
private static final String STATE_QUERY_PARAM = "state";
34+
private static final String BASIS_QUERY_PARAM = "basis";
3635

3736
private final OkHttpClient httpClient;
3837
private final URI pollingUri;
@@ -67,11 +66,7 @@ public CompletableFuture<FDv2PayloadResponse> Poll(Selector selector) {
6766
URI requestUri = pollingUri;
6867

6968
if (!selector.isEmpty()) {
70-
requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion()));
71-
}
72-
73-
if (selector.getState() != null && !selector.getState().isEmpty()) {
74-
requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState());
69+
requestUri = HttpHelpers.addQueryParam(requestUri, BASIS_QUERY_PARAM, selector.getState());
7570
}
7671

7772
logger.debug("Making FDv2 polling request to: {}", requestUri);

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package com.launchdarkly.sdk.server;
22

3+
import com.google.common.collect.ImmutableList;
34
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
45
import com.launchdarkly.sdk.server.datasources.Initializer;
56
import com.launchdarkly.sdk.server.datasources.Synchronizer;
67
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
78
import com.launchdarkly.sdk.server.subsystems.DataSource;
8-
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
9+
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
910

1011
import java.io.Closeable;
1112
import java.io.IOException;
@@ -18,10 +19,10 @@
1819
import java.util.stream.Collectors;
1920

2021
class FDv2DataSource implements DataSource {
21-
private final List<InitializerFactory> initializers;
22+
private final List<DataSourceFactory<Initializer>> initializers;
2223
private final List<SynchronizerFactoryWithState> synchronizers;
2324

24-
private final DataSourceUpdateSink dataSourceUpdates;
25+
private final DataSourceUpdateSinkV2 dataSourceUpdates;
2526

2627
private final CompletableFuture<Boolean> startFuture = new CompletableFuture<>();
2728
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -46,12 +47,12 @@ public enum State {
4647
Blocked
4748
}
4849

49-
private final SynchronizerFactory factory;
50+
private final DataSourceFactory<Synchronizer> factory;
5051

5152
private State state = State.Available;
5253

5354

54-
public SynchronizerFactoryWithState(SynchronizerFactory factory) {
55+
public SynchronizerFactoryWithState(DataSourceFactory<Synchronizer> factory) {
5556
this.factory = factory;
5657
}
5758

@@ -68,19 +69,15 @@ public Synchronizer build() {
6869
}
6970
}
7071

71-
public interface InitializerFactory {
72-
Initializer build();
73-
}
74-
75-
public interface SynchronizerFactory {
76-
Synchronizer build();
72+
public interface DataSourceFactory<T> {
73+
T build();
7774
}
7875

7976

8077
public FDv2DataSource(
81-
List<InitializerFactory> initializers,
82-
List<SynchronizerFactory> synchronizers,
83-
DataSourceUpdateSink dataSourceUpdates
78+
ImmutableList<DataSourceFactory<Initializer>> initializers,
79+
ImmutableList<DataSourceFactory<Synchronizer>> synchronizers,
80+
DataSourceUpdateSinkV2 dataSourceUpdates
8481
) {
8582
this.initializers = initializers;
8683
this.synchronizers = synchronizers
@@ -116,6 +113,40 @@ private SynchronizerFactoryWithState getFirstAvailableSynchronizer() {
116113
}
117114
}
118115

116+
private void runInitializers() {
117+
boolean anyDataReceived = false;
118+
for (DataSourceFactory<Initializer> factory : initializers) {
119+
try {
120+
Initializer initializer = factory.build();
121+
if (setActiveSource(initializer)) return;
122+
FDv2SourceResult result = initializer.run().get();
123+
switch (result.getResultType()) {
124+
case CHANGE_SET:
125+
dataSourceUpdates.apply(result.getChangeSet());
126+
anyDataReceived = true;
127+
if (!result.getChangeSet().getSelector().isEmpty()) {
128+
// We received data with a selector, so we end the initialization process.
129+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
130+
startFuture.complete(true);
131+
return;
132+
}
133+
break;
134+
case STATUS:
135+
// TODO: Implement.
136+
break;
137+
}
138+
} catch (ExecutionException | InterruptedException | CancellationException e) {
139+
// TODO: Log.
140+
}
141+
}
142+
// We received data without a selector, and we have exhausted initializers, so we are going to
143+
// consider ourselves initialized.
144+
if (anyDataReceived) {
145+
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
146+
startFuture.complete(true);
147+
}
148+
}
149+
119150
private void runSynchronizers() {
120151
SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer();
121152
// TODO: Add recovery handling. If there are no available synchronizers, but there are
@@ -130,7 +161,7 @@ private void runSynchronizers() {
130161
FDv2SourceResult result = synchronizer.next().get();
131162
switch (result.getResultType()) {
132163
case CHANGE_SET:
133-
// TODO: Apply to the store.
164+
dataSourceUpdates.apply(result.getChangeSet());
134165
// This could have been completed by any data source. But if it has not been completed before
135166
// now, then we complete it.
136167
startFuture.complete(true);
@@ -186,40 +217,6 @@ private boolean setActiveSource(Closeable synchronizer) {
186217
return false;
187218
}
188219

189-
private void runInitializers() {
190-
boolean anyDataReceived = false;
191-
for (InitializerFactory factory : initializers) {
192-
try {
193-
Initializer initializer = factory.build();
194-
if (setActiveSource(initializer)) return;
195-
FDv2SourceResult res = initializer.run().get();
196-
switch (res.getResultType()) {
197-
case CHANGE_SET:
198-
// TODO: Apply to the store.
199-
anyDataReceived = true;
200-
if (!res.getChangeSet().getSelector().isEmpty()) {
201-
// We received data with a selector, so we end the initialization process.
202-
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
203-
startFuture.complete(true);
204-
return;
205-
}
206-
break;
207-
case STATUS:
208-
// TODO: Implement.
209-
break;
210-
}
211-
} catch (ExecutionException | InterruptedException | CancellationException e) {
212-
// TODO: Log.
213-
}
214-
}
215-
// We received data without a selector, and we have exhausted initializers, so we are going to
216-
// consider ourselves initialized.
217-
if (anyDataReceived) {
218-
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
219-
startFuture.complete(true);
220-
}
221-
}
222-
223220
@Override
224221
public Future<Void> start() {
225222
if (!started.getAndSet(true)) {

0 commit comments

Comments
 (0)