Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
19d9c02
chore: adds fdv2 payload parsing and protocol handling
tanderson-ld Jan 12, 2026
fbea872
adding package info files and fixing package name issue
tanderson-ld Jan 12, 2026
adcaa0e
more checkstyle fixes
tanderson-ld Jan 12, 2026
f2b209d
chore: Add interfaces for synchronizer/initializer.
kinyoklion Jan 13, 2026
8c115cc
Merge branch 'main' into rlamb/add-fdv2-data-source-interfaces
kinyoklion Jan 13, 2026
de2fded
Revert version change
kinyoklion Jan 13, 2026
98d3b39
feat: Add FDv2 polling support.
kinyoklion Jan 13, 2026
da3c639
Merge remote-tracking branch 'origin' into rlamb/add-fdv2-data-source…
kinyoklion Jan 13, 2026
8fb88ed
WIP: Polling initializer/synchronizer.
kinyoklion Jan 14, 2026
da27015
Use updated internal lib.
kinyoklion Jan 14, 2026
aba46ef
Update comment
kinyoklion Jan 14, 2026
7401331
Add termination.
kinyoklion Jan 14, 2026
bba0cdc
Remove test file that isn't ready.
kinyoklion Jan 14, 2026
89bd017
Polling tests and some fixes.
kinyoklion Jan 14, 2026
228f3e6
Try pre block.
kinyoklion Jan 14, 2026
9469b23
Add streaming path.
kinyoklion Jan 14, 2026
9a450e6
Merge branch 'main' of github.com:launchdarkly/java-core into rlamb/a…
kinyoklion Jan 14, 2026
4b8313b
Use the DataStoreTypes.ChangeSet type for data source results.
kinyoklion Jan 14, 2026
31eb13e
Make iterable async queue package private.
kinyoklion Jan 14, 2026
4a2fe3b
Revert Version.java
kinyoklion Jan 14, 2026
3428591
Add comments to SelectorSource.
kinyoklion Jan 14, 2026
ff60216
Revert build.gradle.
kinyoklion Jan 14, 2026
e985f80
Update launchdarklyJavaSdkInternal version to 1.6.1
kinyoklion Jan 14, 2026
a956484
Move mermaid out of doc comment.
kinyoklion Jan 14, 2026
ff2376e
Merge branch 'rlamb/add-fdv2-data-source-interfaces' of github.com:la…
kinyoklion Jan 14, 2026
194c30c
PR feedback.
kinyoklion Jan 14, 2026
707fe0e
Implement more shutdown logic.
kinyoklion Jan 14, 2026
cb79f5e
Change null check.
kinyoklion Jan 14, 2026
b8c5389
PR feedback.
kinyoklion Jan 15, 2026
a90b536
Merge branch 'main' into rlamb/add-fdv2-data-source-interfaces
kinyoklion Jan 15, 2026
8d3e869
PR feedback
kinyoklion Jan 15, 2026
295138a
Merge branch 'rlamb/add-fdv2-data-source-interfaces' of github.com:la…
kinyoklion Jan 15, 2026
ff02c1e
Don't double queue async items
kinyoklion Jan 15, 2026
f805c7e
More comments
kinyoklion Jan 15, 2026
71130ac
PR feedback
kinyoklion Jan 15, 2026
bd8e72b
Fix polling recoverable logic error.
kinyoklion Jan 15, 2026
ff95a85
Better testing
kinyoklion Jan 15, 2026
fd6c3af
Guard source shutdown
kinyoklion Jan 15, 2026
8243b82
More safe closing.
kinyoklion Jan 15, 2026
962afb7
Correct serialization error type.
kinyoklion Jan 15, 2026
fc24f63
Polling better goodbye handling.
kinyoklion Jan 15, 2026
e6edc88
Fix exception type.
kinyoklion Jan 15, 2026
1821373
Fix early return
kinyoklion Jan 15, 2026
bc10b18
EMPTY selector.
kinyoklion Jan 15, 2026
47d55c3
More PR feedback.
kinyoklion Jan 15, 2026
201a92c
Remove unsafe cast from throwable to exception.
kinyoklion Jan 15, 2026
117b85c
Async iterable tests.
kinyoklion Jan 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/sdk/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.nio.file.StandardCopyOption

buildscript {
repositories {
mavenCentral()
mavenLocal()
Comment thread
kinyoklion marked this conversation as resolved.
Outdated
mavenCentral()
}
dependencies {
classpath "org.eclipse.virgo.util:org.eclipse.virgo.util.osgi.manifest:3.5.0.RELEASE"
Expand Down Expand Up @@ -71,7 +71,7 @@ ext.versions = [
"guava": "32.0.1-jre",
"jackson": "2.11.2",
"launchdarklyJavaSdkCommon": "2.1.2",
"launchdarklyJavaSdkInternal": "1.5.1",
"launchdarklyJavaSdkInternal": "1.6.0",
Comment thread
kinyoklion marked this conversation as resolved.
Outdated
"launchdarklyLogging": "1.1.0",
"okhttp": "4.12.0", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource
"okhttpEventsource": "4.1.0",
Expand Down
Comment thread
tanderson-ld marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.sdk.json.SerializationException;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

import javax.annotation.Nonnull;

import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol.
*/
public class DefaultFDv2Requestor implements FDv2Requestor, Closeable {
private static final String VERSION_QUERY_PARAM = "version";
private static final String STATE_QUERY_PARAM = "state";

private final OkHttpClient httpClient;
private final URI pollingUri;
private final Headers headers;
private final LDLogger logger;
private final Map<URI, String> etags;

/**
* Creates a DefaultFDv2Requestor.
*
* @param httpProperties HTTP configuration properties
* @param baseUri base URI for the FDv2 polling endpoint
* @param requestPath the request path to append to the base URI (e.g., "/sdk/poll")
* @param logger logger for diagnostic output
*/
public DefaultFDv2Requestor(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger) {
this.logger = logger;
this.pollingUri = HttpHelpers.concatenateUriPath(baseUri, requestPath);
this.etags = new HashMap<>();

OkHttpClient.Builder httpBuilder = httpProperties.toHttpClientBuilder();
this.headers = httpProperties.toHeadersBuilder().build();
this.httpClient = httpBuilder.build();
}

@Override
public CompletableFuture<FDv2PollingResponse> Poll(Selector selector) {
CompletableFuture<FDv2PollingResponse> future = new CompletableFuture<>();

try {
// Build the request URI with query parameters
URI requestUri = pollingUri;

if (selector.getVersion() > 0) {
requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this version constraint on the Dotnet impl. I'm wondering why this ever was imposed? What if 0 is valid?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a couple reasons. 0 and not present would be the same, and 0 is the "zero" value for the go implementation.

That said this should use !selector.isEmpty() and I will change it.


if (selector.getState() != null && !selector.getState().isEmpty()) {
requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState());
}

logger.debug("Making FDv2 polling request to: {}", requestUri);

// Build the HTTP request
Request.Builder requestBuilder = new Request.Builder()
.url(requestUri.toURL())
.headers(headers)
.get();

// Add ETag if we have one cached for this URI
synchronized (etags) {
String etag = etags.get(requestUri);
if (etag != null) {
requestBuilder.header("If-None-Match", etag);
}
}

Request request = requestBuilder.build();
final URI finalRequestUri = requestUri;

// Make asynchronous HTTP call
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@Nonnull Call call, @Nonnull IOException e) {
if (e instanceof SocketTimeoutException) {
future.completeExceptionally(
new IOException("FDv2 polling request timed out: " + finalRequestUri, e)
);
} else {
future.completeExceptionally(e);
}
}

@Override
public void onResponse(@Nonnull Call call, @Nonnull Response response) {
try {
// Handle 304 Not Modified - no new data
if (response.code() == 304) {
logger.debug("FDv2 polling request returned 304: not modified");
future.complete(null);
return;
}

if (!response.isSuccessful()) {
future.completeExceptionally(
new IOException("FDv2 polling request failed with status code: " + response.code())
);
return;
Comment thread
kinyoklion marked this conversation as resolved.
}

// Update ETag cache
String newEtag = response.header("ETag");
synchronized (etags) {
if (newEtag != null) {
etags.put(finalRequestUri, newEtag);
} else {
etags.remove(finalRequestUri);
}
}

// Parse the response body
if (response.body() == null) {
future.completeExceptionally(new IOException("Response body is null"));
Comment thread
tanderson-ld marked this conversation as resolved.
Outdated
return;
}

String responseBody = response.body().string();
logger.debug("Received FDv2 polling response");

List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody);

// Create and return the response
FDv2PollingResponse pollingResponse = new FDv2PollingResponse(events, response.headers());
future.complete(pollingResponse);

} catch (IOException | SerializationException e) {
future.completeExceptionally(e);
Comment thread
kinyoklion marked this conversation as resolved.
} finally {
response.close();
}
}
});

} catch (Exception e) {
future.completeExceptionally(e);
}

return future;
}

/**
* Closes the HTTP client and releases resources.
*/
public void close() {
HttpProperties.shutdownHttpClient(httpClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import okhttp3.Headers;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* This type is currently experimental and not subject to semantic versioning.
* <p>
* Interface for making FDv2 polling requests.
*/
interface FDv2Requestor {
public static class FDv2PollingResponse {
private final List<FDv2Event> events;
private final Headers headers;

public FDv2PollingResponse(List<FDv2Event> events, Headers headers) {
this.events = events;
this.headers = headers;
}

public List<FDv2Event> getEvents() {
return events;
}

public Headers getHeaders() {
return headers;
}
}
CompletableFuture<FDv2PollingResponse> Poll(Selector selector);

void close();
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encapsulates all the poll-specific logic which is then used by the polling synchronizer/initializer.

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet;
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.SerializationException;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CompletableFuture;

import static com.launchdarkly.sdk.internal.http.HttpErrors.*;

class PollingBase {
private final FDv2Requestor requestor;
private final LDLogger logger;

public PollingBase(FDv2Requestor requestor, LDLogger logger) {
this.requestor = requestor;
this.logger = logger;
}

protected void internalShutdown() {
requestor.close();
}

protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean oneShot) {
return requestor.Poll(selector).handle(((pollingResponse, ex) -> {
if (ex != null) {
if (ex instanceof HttpErrors.HttpErrorException) {
HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex;
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus());
boolean recoverable = e.getStatus() > 0 && !isHttpErrorRecoverable(e.getStatus());
logger.error("Polling request failed with HTTP error: {}", e.getStatus());
// For a one-shot request all errors are terminal.
if (oneShot) {
return FDv2SourceResult.terminalError(errorInfo);
} else {
return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo);
Comment thread
kinyoklion marked this conversation as resolved.
}
} else if (ex instanceof IOException) {
IOException e = (IOException) ex;
logger.error("Polling request failed with network error: {}", e.toString());
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
0,
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
} else if (ex instanceof SerializationException) {
SerializationException e = (SerializationException) ex;
logger.error("Polling request received malformed data: {}", e.toString());
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.INVALID_DATA,
0,
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
Comment thread
kinyoklion marked this conversation as resolved.
}
Exception e = (Exception) ex;
Comment thread
kinyoklion marked this conversation as resolved.
Outdated
logger.error("Polling request failed with an unknown error: {}", e.toString());
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
}
FDv2ProtocolHandler handler = new FDv2ProtocolHandler();
for (FDv2Event event : pollingResponse.getEvents()) {
Comment thread
kinyoklion marked this conversation as resolved.
FDv2ProtocolHandler.IFDv2ProtocolAction res = handler.handleEvent(event);
switch (res.getAction()) {
case CHANGESET:
return FDv2SourceResult.changeSet(((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset());
case ERROR:
FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res);
return FDv2SourceResult.terminalError(
new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
error.getReason(),
new Date().toInstant()));
case GOODBYE:
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason());
case NONE:
break;
case INTERNAL_ERROR:
return FDv2SourceResult.terminalError(
new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
"Internal error occurred during polling",
new Date().toInstant()));
}
}
return FDv2SourceResult.terminalError(new DataSourceStatusProvider.ErrorInfo(
DataSourceStatusProvider.ErrorKind.UNKNOWN,
0,
"Unexpected end of polling response",
new Date().toInstant()
));
Comment thread
kinyoklion marked this conversation as resolved.
Outdated
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.SelectorSource;

import java.util.concurrent.CompletableFuture;

class PollingInitializerImpl extends PollingBase implements Initializer {
private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>();
private final SelectorSource selectorSource;

public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) {
super(requestor, logger);
this.selectorSource = selectorSource;
}

@Override
public CompletableFuture<FDv2SourceResult> run() {
CompletableFuture<FDv2SourceResult> pollResult = poll(selectorSource.getSelector(), true);
return CompletableFuture.anyOf(shutdownFuture, pollResult)
.thenApply(result -> (FDv2SourceResult) result);
}

@Override
public void shutdown() {
shutdownFuture.complete(FDv2SourceResult.shutdown());
Copy link
Copy Markdown
Contributor

@tanderson-ld tanderson-ld Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If order matters here, consider adding comment to avoid future dev changing things and not realizing the future completing before internal shutdown is invoked is intentional.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't matter. I may do it the other way.

internalShutdown();
}
}
Loading
Loading