Skip to content
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
19d9c02
chore: adds fdv2 payload parsing and protocol handling
tanderson-ld Jan 12, 2026
fbea872
adding package info files and fixing package name issue
tanderson-ld Jan 12, 2026
adcaa0e
more checkstyle fixes
tanderson-ld Jan 12, 2026
f2b209d
chore: Add interfaces for synchronizer/initializer.
kinyoklion Jan 13, 2026
8c115cc
Merge branch 'main' into rlamb/add-fdv2-data-source-interfaces
kinyoklion Jan 13, 2026
de2fded
Revert version change
kinyoklion Jan 13, 2026
98d3b39
feat: Add FDv2 polling support.
kinyoklion Jan 13, 2026
da3c639
Merge remote-tracking branch 'origin' into rlamb/add-fdv2-data-source…
kinyoklion Jan 13, 2026
8fb88ed
WIP: Polling initializer/synchronizer.
kinyoklion Jan 14, 2026
da27015
Use updated internal lib.
kinyoklion Jan 14, 2026
aba46ef
Update comment
kinyoklion Jan 14, 2026
7401331
Add termination.
kinyoklion Jan 14, 2026
bba0cdc
Remove test file that isn't ready.
kinyoklion Jan 14, 2026
89bd017
Polling tests and some fixes.
kinyoklion Jan 14, 2026
228f3e6
Try pre block.
kinyoklion Jan 14, 2026
9469b23
Add streaming path.
kinyoklion Jan 14, 2026
9a450e6
Merge branch 'main' of github.com:launchdarkly/java-core into rlamb/a…
kinyoklion Jan 14, 2026
4b8313b
Use the DataStoreTypes.ChangeSet type for data source results.
kinyoklion Jan 14, 2026
31eb13e
Make iterable async queue package private.
kinyoklion Jan 14, 2026
4a2fe3b
Revert Version.java
kinyoklion Jan 14, 2026
3428591
Add comments to SelectorSource.
kinyoklion Jan 14, 2026
ff60216
Revert build.gradle.
kinyoklion Jan 14, 2026
e985f80
Update launchdarklyJavaSdkInternal version to 1.6.1
kinyoklion Jan 14, 2026
a956484
Move mermaid out of doc comment.
kinyoklion Jan 14, 2026
ff2376e
Merge branch 'rlamb/add-fdv2-data-source-interfaces' of github.com:la…
kinyoklion Jan 14, 2026
376bb1f
chore: Add streaming synchronizer.
kinyoklion Jan 14, 2026
194c30c
PR feedback.
kinyoklion Jan 14, 2026
707fe0e
Implement more shutdown logic.
kinyoklion Jan 14, 2026
cb79f5e
Change null check.
kinyoklion Jan 14, 2026
6702239
Merge branch 'rlamb/add-fdv2-data-source-interfaces' into rlamb/strea…
kinyoklion Jan 14, 2026
0aba424
chore: Implement streaming synchronizer.
kinyoklion Jan 14, 2026
49c6008
Merge remote-tracking branch 'origin' into rlamb/streaming-synchronizer
kinyoklion Jan 15, 2026
b429eba
Basic streaming synchronizer.
kinyoklion Jan 15, 2026
278f670
Extend test coverage
kinyoklion Jan 15, 2026
84be62d
Add payload filter and more testing.
kinyoklion Jan 16, 2026
ec609a5
Add comments to FDv2 data source interfaces.
kinyoklion Jan 16, 2026
3461149
Remove extra blank lines
kinyoklion Jan 16, 2026
b32d3ca
Revert requestor change
kinyoklion Jan 16, 2026
97ac7c4
Remove leftover file.
kinyoklion Jan 16, 2026
7c84a68
Extend polling tests for INTERNAL_ERROR
kinyoklion Jan 16, 2026
9c43cbb
Handle close before start.
kinyoklion Jan 16, 2026
a383fc9
Threading and tests.
kinyoklion Jan 16, 2026
9c1c4c4
Update documentation.
kinyoklion Jan 16, 2026
d4dff45
Merge branch 'main' into rlamb/streaming-synchronizer
kinyoklion Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,20 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
case NONE:
break;
case INTERNAL_ERROR: {
FDv2ProtocolHandler.FDv2ActionInternalError internalErrorAction = (FDv2ProtocolHandler.FDv2ActionInternalError) res;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gap identified working on streaming.

DataSourceStatusProvider.ErrorKind kind = DataSourceStatusProvider.ErrorKind.UNKNOWN;
switch (internalErrorAction.getErrorType()) {
case MISSING_PAYLOAD:
case JSON_ERROR:
kind = DataSourceStatusProvider.ErrorKind.INVALID_DATA;
break;
case UNKNOWN_EVENT:
case IMPLEMENTATION_ERROR:
case PROTOCOL_ERROR:
break;
}
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
kind,
0,
"Internal error occurred during polling",
new Date().toInstant());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
public interface Initializer extends Closeable {
/**
* Run the initializer to completion.
* <p>
* This method is intended to be called only single time for an instance.
* @return The result of the initializer.
*/
CompletableFuture<FDv2SourceResult> run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface Synchronizer extends Closeable {
* <p>
* This method is intended to be driven by a single thread, and for there to be a single outstanding call
* at any given time.
* <p>
* Once SHUTDOWN, TERMINAL_ERROR, or GOODBYE has been produced, then no further calls to next() should be made.
* @return a future that will complete when the next result is available
*/
CompletableFuture<FDv2SourceResult> next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,98 @@ public void emptyEventsArray() throws Exception {
assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType());
assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState());



}

@Test
public void internalErrorWithInvalidDataKind() throws Exception {
FDv2Requestor requestor = mockRequestor();
SelectorSource selectorSource = mockSelectorSource();

// Create a response with malformed payload-transferred event. `state->states`.
// This will trigger JSON_ERROR internal error which maps to INVALID_DATA
String malformedPutObjectJson = "{\n" +
" \"events\": [\n" +
" {\n" +
" \"event\": \"server-intent\",\n" +
" \"data\": {\n" +
" \"payloads\": [{\n" +
" \"id\": \"payload-1\",\n" +
" \"target\": 100,\n" +
" \"intentCode\": \"xfer-full\",\n" +
" \"reason\": \"payload-missing\"\n" +
" }]\n" +
" }\n" +
" },\n" +
" {\n" +
" \"event\": \"payload-transferred\",\n" +
" \"data\": {\n" +
" \"states\": \"(p:payload-1:100)\",\n" +
" \"version\": 100\n" +
" }\n" +
" },\n" +
" {\n" +
" \"event\": \"put-object\",\n" +
" \"data\": {}\n" +
" }\n" +
" ]\n" +
"}";

FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse(
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPutObjectJson),
okhttp3.Headers.of()
);

when(requestor.Poll(any(Selector.class)))
.thenReturn(CompletableFuture.completedFuture(response));

PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource);

CompletableFuture<FDv2SourceResult> resultFuture = initializer.run();
FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS);

assertNotNull(result);
assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType());
assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState());
assertEquals(DataSourceStatusProvider.ErrorKind.INVALID_DATA, result.getStatus().getErrorInfo().getKind());


}

@Test
public void internalErrorWithUnknownKind() throws Exception {
FDv2Requestor requestor = mockRequestor();
SelectorSource selectorSource = mockSelectorSource();

// Create a response with an unrecognized event type
// This will trigger UNKNOWN_EVENT internal error which maps to UNKNOWN error kind
String unknownEventJson = "{\n" +
" \"events\": [\n" +
" {\n" +
" \"event\": \"unrecognized-event-type\",\n" +
" \"data\": {}\n" +
" }\n" +
" ]\n" +
"}";

FDv2Requestor.FDv2PayloadResponse response = new FDv2Requestor.FDv2PayloadResponse(
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(unknownEventJson),
okhttp3.Headers.of()
);

when(requestor.Poll(any(Selector.class)))
.thenReturn(CompletableFuture.completedFuture(response));

PollingInitializerImpl initializer = new PollingInitializerImpl(requestor, testLogger, selectorSource);

CompletableFuture<FDv2SourceResult> resultFuture = initializer.run();
FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS);

assertNotNull(result);
assertEquals(FDv2SourceResult.ResultType.STATUS, result.getResultType());
assertEquals(FDv2SourceResult.State.TERMINAL_ERROR, result.getStatus().getState());
assertEquals(DataSourceStatusProvider.ErrorKind.UNKNOWN, result.getStatus().getErrorInfo().getKind());


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -722,4 +722,148 @@ public void nonRecoverableThenRecoverableErrorStopsPolling() throws Exception {
executor.shutdown();
}
}

@Test
public void internalErrorWithInvalidDataKindContinuesPolling() throws Exception {
FDv2Requestor requestor = mockRequestor();
SelectorSource selectorSource = mockSelectorSource();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

AtomicInteger callCount = new AtomicInteger(0);
when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> {
int count = callCount.incrementAndGet();
if (count == 1) {
// First call returns response with malformed put-object which triggers INTERNAL_ERROR (INVALID_DATA)
String malformedPutObjectJson = "{\n" +
" \"events\": [\n" +
" {\n" +
" \"event\": \"server-intent\",\n" +
" \"data\": {\n" +
" \"payloads\": [{\n" +
" \"id\": \"payload-1\",\n" +
" \"target\": 100,\n" +
" \"intentCode\": \"xfer-full\",\n" +
" \"reason\": \"payload-missing\"\n" +
" }]\n" +
" }\n" +
" },\n" +
" {\n" +
" \"event\": \"payload-transferred\",\n" +
" \"data\": {\n" +
" \"state\": \"(p:payload-1:100)\",\n" +
" \"version\": 100\n" +
" }\n" +
" },\n" +
" {\n" +
" \"event\": \"put-object\",\n" +
" \"data\": {}\n" +
" }\n" +
" ]\n" +
"}";

return CompletableFuture.completedFuture(new FDv2Requestor.FDv2PayloadResponse(
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(malformedPutObjectJson),
okhttp3.Headers.of()
));
} else {
// Subsequent calls succeed
return CompletableFuture.completedFuture(makeSuccessResponse());
}
});

try {
PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl(
requestor,
testLogger,
selectorSource,
executor,
Duration.ofMillis(50)
);

// Wait for multiple polls
Thread.sleep(250);

// First result should be interrupted with INVALID_DATA error kind
FDv2SourceResult result1 = synchronizer.next().get(1, TimeUnit.SECONDS);
assertNotNull(result1);
assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType());
assertEquals(FDv2SourceResult.State.INTERRUPTED, result1.getStatus().getState());
assertEquals(DataSourceStatusProvider.ErrorKind.INVALID_DATA, result1.getStatus().getErrorInfo().getKind());

// Second result should be success (polling continued)
FDv2SourceResult result2 = synchronizer.next().get(1, TimeUnit.SECONDS);
assertNotNull(result2);
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType());

// Verify polling continued after internal error
assertTrue("Should have made at least 2 calls", callCount.get() >= 2);

synchronizer.close();
} finally {
executor.shutdown();
}
}

@Test
public void internalErrorWithUnknownKindContinuesPolling() throws Exception {
FDv2Requestor requestor = mockRequestor();
SelectorSource selectorSource = mockSelectorSource();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

AtomicInteger callCount = new AtomicInteger(0);
when(requestor.Poll(any(Selector.class))).thenAnswer(invocation -> {
int count = callCount.incrementAndGet();
if (count == 1) {
// First call returns response with unknown event which triggers INTERNAL_ERROR (UNKNOWN)
String unknownEventJson = "{\n" +
" \"events\": [\n" +
" {\n" +
" \"event\": \"unrecognized-event-type\",\n" +
" \"data\": {}\n" +
" }\n" +
" ]\n" +
"}";

return CompletableFuture.completedFuture(new FDv2Requestor.FDv2PayloadResponse(
com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event.parseEventsArray(unknownEventJson),
okhttp3.Headers.of()
));
} else {
// Subsequent calls succeed
return CompletableFuture.completedFuture(makeSuccessResponse());
}
});

try {
PollingSynchronizerImpl synchronizer = new PollingSynchronizerImpl(
requestor,
testLogger,
selectorSource,
executor,
Duration.ofMillis(50)
);

// Wait for multiple polls
Thread.sleep(250);

// First result should be interrupted with UNKNOWN error kind
FDv2SourceResult result1 = synchronizer.next().get(1, TimeUnit.SECONDS);
assertNotNull(result1);
assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType());
assertEquals(FDv2SourceResult.State.INTERRUPTED, result1.getStatus().getState());
assertEquals(DataSourceStatusProvider.ErrorKind.UNKNOWN, result1.getStatus().getErrorInfo().getKind());

// Second result should be success (polling continued)
FDv2SourceResult result2 = synchronizer.next().get(1, TimeUnit.SECONDS);
assertNotNull(result2);
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType());

// Verify polling continued after internal error
assertTrue("Should have made at least 2 calls", callCount.get() >= 2);

synchronizer.close();
} finally {
executor.shutdown();
}
}
}
Loading
Loading