-
Notifications
You must be signed in to change notification settings - Fork 10
chore: Add FDv2 data source interfaces. #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
19d9c02
fbea872
adcaa0e
f2b209d
8c115cc
de2fded
98d3b39
da3c639
8fb88ed
da27015
aba46ef
7401331
bba0cdc
89bd017
228f3e6
9469b23
9a450e6
4b8313b
31eb13e
4a2fe3b
3428591
ff60216
e985f80
a956484
ff2376e
194c30c
707fe0e
cb79f5e
b8c5389
a90b536
8d3e869
295138a
ff02c1e
f805c7e
71130ac
bd8e72b
ff95a85
fd6c3af
8243b82
962afb7
fc24f63
e6edc88
1821373
bc10b18
47d55c3
201a92c
117b85c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
tanderson-ld marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.logging.LDLogger; | ||
| import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import com.launchdarkly.sdk.internal.http.HttpHelpers; | ||
| import com.launchdarkly.sdk.internal.http.HttpProperties; | ||
| import com.launchdarkly.sdk.json.SerializationException; | ||
|
|
||
| import okhttp3.Call; | ||
| import okhttp3.Callback; | ||
| import okhttp3.Headers; | ||
| import okhttp3.OkHttpClient; | ||
| import okhttp3.Request; | ||
| import okhttp3.Response; | ||
|
|
||
| import javax.annotation.Nonnull; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.net.SocketTimeoutException; | ||
| import java.net.URI; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** | ||
| * Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol. | ||
| */ | ||
| public class DefaultFDv2Requestor implements FDv2Requestor, Closeable { | ||
| private static final String VERSION_QUERY_PARAM = "version"; | ||
| private static final String STATE_QUERY_PARAM = "state"; | ||
|
|
||
| private final OkHttpClient httpClient; | ||
| private final URI pollingUri; | ||
| private final Headers headers; | ||
| private final LDLogger logger; | ||
| private final Map<URI, String> etags; | ||
|
|
||
| /** | ||
| * Creates a DefaultFDv2Requestor. | ||
| * | ||
| * @param httpProperties HTTP configuration properties | ||
| * @param baseUri base URI for the FDv2 polling endpoint | ||
| * @param requestPath the request path to append to the base URI (e.g., "/sdk/poll") | ||
| * @param logger logger for diagnostic output | ||
| */ | ||
| public DefaultFDv2Requestor(HttpProperties httpProperties, URI baseUri, String requestPath, LDLogger logger) { | ||
| this.logger = logger; | ||
| this.pollingUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); | ||
| this.etags = new HashMap<>(); | ||
|
|
||
| OkHttpClient.Builder httpBuilder = httpProperties.toHttpClientBuilder(); | ||
| this.headers = httpProperties.toHeadersBuilder().build(); | ||
| this.httpClient = httpBuilder.build(); | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<FDv2PollingResponse> Poll(Selector selector) { | ||
| CompletableFuture<FDv2PollingResponse> future = new CompletableFuture<>(); | ||
|
|
||
| try { | ||
| // Build the request URI with query parameters | ||
| URI requestUri = pollingUri; | ||
|
|
||
| if (selector.getVersion() > 0) { | ||
| requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion())); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see this version constraint on the Dotnet impl. I'm wondering why this ever was imposed? What if 0 is valid?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a couple reasons. 0 and not present would be the same, and 0 is the "zero" value for the go implementation. That said this should use |
||
|
|
||
| if (selector.getState() != null && !selector.getState().isEmpty()) { | ||
| requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState()); | ||
| } | ||
|
|
||
| logger.debug("Making FDv2 polling request to: {}", requestUri); | ||
|
|
||
| // Build the HTTP request | ||
| Request.Builder requestBuilder = new Request.Builder() | ||
| .url(requestUri.toURL()) | ||
| .headers(headers) | ||
| .get(); | ||
|
|
||
| // Add ETag if we have one cached for this URI | ||
| synchronized (etags) { | ||
| String etag = etags.get(requestUri); | ||
| if (etag != null) { | ||
| requestBuilder.header("If-None-Match", etag); | ||
| } | ||
| } | ||
|
|
||
| Request request = requestBuilder.build(); | ||
| final URI finalRequestUri = requestUri; | ||
|
|
||
| // Make asynchronous HTTP call | ||
| httpClient.newCall(request).enqueue(new Callback() { | ||
| @Override | ||
| public void onFailure(@Nonnull Call call, @Nonnull IOException e) { | ||
| if (e instanceof SocketTimeoutException) { | ||
| future.completeExceptionally( | ||
| new IOException("FDv2 polling request timed out: " + finalRequestUri, e) | ||
| ); | ||
| } else { | ||
| future.completeExceptionally(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onResponse(@Nonnull Call call, @Nonnull Response response) { | ||
| try { | ||
| // Handle 304 Not Modified - no new data | ||
| if (response.code() == 304) { | ||
| logger.debug("FDv2 polling request returned 304: not modified"); | ||
| future.complete(null); | ||
| return; | ||
| } | ||
|
|
||
| if (!response.isSuccessful()) { | ||
| future.completeExceptionally( | ||
| new IOException("FDv2 polling request failed with status code: " + response.code()) | ||
| ); | ||
| return; | ||
|
kinyoklion marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // Update ETag cache | ||
| String newEtag = response.header("ETag"); | ||
| synchronized (etags) { | ||
| if (newEtag != null) { | ||
| etags.put(finalRequestUri, newEtag); | ||
| } else { | ||
| etags.remove(finalRequestUri); | ||
| } | ||
| } | ||
|
|
||
| // Parse the response body | ||
| if (response.body() == null) { | ||
| future.completeExceptionally(new IOException("Response body is null")); | ||
|
tanderson-ld marked this conversation as resolved.
Outdated
|
||
| return; | ||
| } | ||
|
|
||
| String responseBody = response.body().string(); | ||
| logger.debug("Received FDv2 polling response"); | ||
|
|
||
| List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody); | ||
|
|
||
| // Create and return the response | ||
| FDv2PollingResponse pollingResponse = new FDv2PollingResponse(events, response.headers()); | ||
| future.complete(pollingResponse); | ||
|
|
||
| } catch (IOException | SerializationException e) { | ||
| future.completeExceptionally(e); | ||
|
kinyoklion marked this conversation as resolved.
|
||
| } finally { | ||
| response.close(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| } catch (Exception e) { | ||
| future.completeExceptionally(e); | ||
| } | ||
|
|
||
| return future; | ||
| } | ||
|
|
||
| /** | ||
| * Closes the HTTP client and releases resources. | ||
| */ | ||
| public void close() { | ||
| HttpProperties.shutdownHttpClient(httpClient); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import okhttp3.Headers; | ||
|
|
||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** | ||
| * This type is currently experimental and not subject to semantic versioning. | ||
| * <p> | ||
| * Interface for making FDv2 polling requests. | ||
| */ | ||
| interface FDv2Requestor { | ||
| public static class FDv2PollingResponse { | ||
| private final List<FDv2Event> events; | ||
| private final Headers headers; | ||
|
|
||
| public FDv2PollingResponse(List<FDv2Event> events, Headers headers) { | ||
| this.events = events; | ||
| this.headers = headers; | ||
| } | ||
|
|
||
| public List<FDv2Event> getEvents() { | ||
| return events; | ||
| } | ||
|
|
||
| public Headers getHeaders() { | ||
| return headers; | ||
| } | ||
| } | ||
| CompletableFuture<FDv2PollingResponse> Poll(Selector selector); | ||
|
|
||
| void close(); | ||
| } |
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Encapsulates all the poll-specific logic which is then used by the polling synchronizer/initializer. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.logging.LDLogger; | ||
| import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ChangeSet; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import com.launchdarkly.sdk.internal.http.HttpErrors; | ||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
| import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; | ||
| import com.launchdarkly.sdk.server.subsystems.SerializationException; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Date; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| import static com.launchdarkly.sdk.internal.http.HttpErrors.*; | ||
|
|
||
| class PollingBase { | ||
| private final FDv2Requestor requestor; | ||
| private final LDLogger logger; | ||
|
|
||
| public PollingBase(FDv2Requestor requestor, LDLogger logger) { | ||
| this.requestor = requestor; | ||
| this.logger = logger; | ||
| } | ||
|
|
||
| protected void internalShutdown() { | ||
| requestor.close(); | ||
| } | ||
|
|
||
| protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean oneShot) { | ||
| return requestor.Poll(selector).handle(((pollingResponse, ex) -> { | ||
| if (ex != null) { | ||
| if (ex instanceof HttpErrors.HttpErrorException) { | ||
| HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex; | ||
| DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus()); | ||
| boolean recoverable = e.getStatus() > 0 && !isHttpErrorRecoverable(e.getStatus()); | ||
| logger.error("Polling request failed with HTTP error: {}", e.getStatus()); | ||
| // For a one-shot request all errors are terminal. | ||
| if (oneShot) { | ||
| return FDv2SourceResult.terminalError(errorInfo); | ||
| } else { | ||
| return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo); | ||
|
kinyoklion marked this conversation as resolved.
|
||
| } | ||
| } else if (ex instanceof IOException) { | ||
| IOException e = (IOException) ex; | ||
| logger.error("Polling request failed with network error: {}", e.toString()); | ||
| DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, | ||
| 0, | ||
| e.toString(), | ||
| new Date().toInstant() | ||
| ); | ||
| return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); | ||
| } else if (ex instanceof SerializationException) { | ||
| SerializationException e = (SerializationException) ex; | ||
| logger.error("Polling request received malformed data: {}", e.toString()); | ||
| DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.INVALID_DATA, | ||
| 0, | ||
| e.toString(), | ||
| new Date().toInstant() | ||
| ); | ||
| return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); | ||
|
kinyoklion marked this conversation as resolved.
|
||
| } | ||
| Exception e = (Exception) ex; | ||
|
kinyoklion marked this conversation as resolved.
Outdated
|
||
| logger.error("Polling request failed with an unknown error: {}", e.toString()); | ||
| DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.UNKNOWN, | ||
| 0, | ||
| e.toString(), | ||
| new Date().toInstant() | ||
| ); | ||
| return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info); | ||
| } | ||
| FDv2ProtocolHandler handler = new FDv2ProtocolHandler(); | ||
| for (FDv2Event event : pollingResponse.getEvents()) { | ||
|
kinyoklion marked this conversation as resolved.
|
||
| FDv2ProtocolHandler.IFDv2ProtocolAction res = handler.handleEvent(event); | ||
| switch (res.getAction()) { | ||
| case CHANGESET: | ||
| return FDv2SourceResult.changeSet(((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset()); | ||
| case ERROR: | ||
| FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res); | ||
| return FDv2SourceResult.terminalError( | ||
| new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.UNKNOWN, | ||
| 0, | ||
| error.getReason(), | ||
| new Date().toInstant())); | ||
| case GOODBYE: | ||
| return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason()); | ||
| case NONE: | ||
| break; | ||
| case INTERNAL_ERROR: | ||
| return FDv2SourceResult.terminalError( | ||
| new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.UNKNOWN, | ||
| 0, | ||
| "Internal error occurred during polling", | ||
| new Date().toInstant())); | ||
| } | ||
| } | ||
| return FDv2SourceResult.terminalError(new DataSourceStatusProvider.ErrorInfo( | ||
| DataSourceStatusProvider.ErrorKind.UNKNOWN, | ||
| 0, | ||
| "Unexpected end of polling response", | ||
| new Date().toInstant() | ||
| )); | ||
|
kinyoklion marked this conversation as resolved.
Outdated
|
||
| })); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.logging.LDLogger; | ||
| import com.launchdarkly.sdk.internal.fdv2.sources.Selector; | ||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
| import com.launchdarkly.sdk.server.datasources.Initializer; | ||
| import com.launchdarkly.sdk.server.datasources.SelectorSource; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| class PollingInitializerImpl extends PollingBase implements Initializer { | ||
| private final CompletableFuture<FDv2SourceResult> shutdownFuture = new CompletableFuture<>(); | ||
| private final SelectorSource selectorSource; | ||
|
|
||
| public PollingInitializerImpl(FDv2Requestor requestor, LDLogger logger, SelectorSource selectorSource) { | ||
| super(requestor, logger); | ||
| this.selectorSource = selectorSource; | ||
| } | ||
|
|
||
| @Override | ||
| public CompletableFuture<FDv2SourceResult> run() { | ||
| CompletableFuture<FDv2SourceResult> pollResult = poll(selectorSource.getSelector(), true); | ||
| return CompletableFuture.anyOf(shutdownFuture, pollResult) | ||
| .thenApply(result -> (FDv2SourceResult) result); | ||
| } | ||
|
|
||
| @Override | ||
| public void shutdown() { | ||
| shutdownFuture.complete(FDv2SourceResult.shutdown()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If order matters here, consider adding comment to avoid future dev changing things and not realizing the future completing before internal shutdown is invoked is intentional.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't matter. I may do it the other way. |
||
| internalShutdown(); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.