Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/sdk/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext.versions = [
"launchdarklyJavaSdkInternal": "1.6.1",
"launchdarklyLogging": "1.1.0",
"okhttp": "4.12.0", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource
"okhttpEventsource": "4.1.0",
"okhttpEventsource": "4.2.0",
"reactorCore":"3.3.22.RELEASE",
"slf4j": "1.7.36",
"snakeyaml": "2.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.sdk.internal.http.HttpHelpers;
import com.launchdarkly.sdk.internal.http.HttpProperties;
import com.launchdarkly.sdk.json.SerializationException;

import okhttp3.Call;
import okhttp3.Callback;
Expand Down Expand Up @@ -116,14 +114,12 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) {
// Handle 304 Not Modified - no new data
if (response.code() == 304) {
logger.debug("FDv2 polling request returned 304: not modified");
future.complete(null);
future.complete(FDv2PayloadResponse.none(response.code()));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having null indicate no transfer I made an explicit none.

return;
}

if (!response.isSuccessful()) {
future.completeExceptionally(
new HttpErrors.HttpErrorException(response.code())
);
future.complete(FDv2PayloadResponse.failure(response.code(), response.headers()));
return;
}

Expand All @@ -145,7 +141,7 @@ public void onResponse(@Nonnull Call call, @Nonnull Response response) {
List<FDv2Event> events = FDv2Event.parseEventsArray(responseBody);

// Create and return the response
FDv2PayloadResponse pollingResponse = new FDv2PayloadResponse(events, response.headers());
FDv2PayloadResponse pollingResponse = FDv2PayloadResponse.success(events, response.headers(), response.code());
future.complete(pollingResponse);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,31 @@ interface FDv2Requestor {
* to get from one payload version to another.
* This isn't intended for use for implementations which may require multiple executions to get an entire payload.
*/
public static class FDv2PayloadResponse {
public static class FDv2PayloadResponse {
private final List<FDv2Event> events;
private final Headers headers;

public FDv2PayloadResponse(List<FDv2Event> events, Headers headers) {
private final boolean successful;

private final int statusCode;

private FDv2PayloadResponse(List<FDv2Event> events, Headers headers, boolean success, int statusCode) {
this.events = events;
this.headers = headers;
this.successful = success;
this.statusCode = statusCode;
}

public static FDv2PayloadResponse failure(int statusCode, Headers headers) {
return new FDv2PayloadResponse(null, headers, false, statusCode);
}

public static FDv2PayloadResponse success(List<FDv2Event> events, Headers headers, int statusCode) {
return new FDv2PayloadResponse(events, headers, true, statusCode);
}

public static FDv2PayloadResponse none(int statusCode) {
return new FDv2PayloadResponse(null, null, true, statusCode);
}

public List<FDv2Event> getEvents() {
Expand All @@ -34,6 +52,14 @@ public List<FDv2Event> getEvents() {
public Headers getHeaders() {
return headers;
}

public boolean isSuccess() {
return successful;
}

public int getStatusCode() {
return statusCode;
}
}
CompletableFuture<FDv2PayloadResponse> Poll(Selector selector);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.launchdarkly.sdk.server;

enum HeaderConstants {
ENVIRONMENT_ID("x-ld-envid"),
FDV1_FALLBACK("x-ld-fd-fallback");

private final String headerName;

HeaderConstants(String headerName) {
this.headerName = headerName;
}

public String getHeaderName() {
return headerName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event;
import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.internal.http.HttpErrors;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
Expand All @@ -29,23 +28,28 @@ protected void internalShutdown() {
requestor.close();
}

private static boolean getFallback(FDv2Requestor.FDv2PayloadResponse response) {
if (response != null && response.getHeaders() != null) {
String headerValue = response.getHeaders().get(HeaderConstants.FDV1_FALLBACK.getHeaderName());
return headerValue != null && headerValue.equalsIgnoreCase("true");
}

return false;
}

private static String getEnvironmentId(FDv2Requestor.FDv2PayloadResponse response) {
if (response != null && response.getHeaders() != null) {
return response.getHeaders().get(HeaderConstants.ENVIRONMENT_ID.getHeaderName());
}
return null;
}

protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean oneShot) {
return requestor.Poll(selector).handle(((pollingResponse, ex) -> {
boolean fdv1Fallback = getFallback(pollingResponse);
String environmentId = getEnvironmentId(pollingResponse);
if (ex != null) {
if (ex instanceof HttpErrors.HttpErrorException) {
HttpErrors.HttpErrorException e = (HttpErrors.HttpErrorException) ex;
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(e.getStatus());
// Errors without an HTTP status are recoverable. If there is a status, then we check if the error
// is recoverable.
boolean recoverable = e.getStatus() <= 0 || isHttpErrorRecoverable(e.getStatus());
logger.error("Polling request failed with HTTP error: {}", e.getStatus());
// For a one-shot request all errors are terminal.
if (oneShot) {
return FDv2SourceResult.terminalError(errorInfo);
} else {
return recoverable ? FDv2SourceResult.interrupted(errorInfo) : FDv2SourceResult.terminalError(errorInfo);
}
} else if (ex instanceof IOException) {
if (ex instanceof IOException) {
IOException e = (IOException) ex;
logger.error("Polling request failed with network error: {}", e.toString());
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
Expand All @@ -54,7 +58,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
} else if (ex instanceof SerializationException) {
SerializationException e = (SerializationException) ex;
logger.error("Polling request received malformed data: {}", e.toString());
Expand All @@ -64,7 +68,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
String msg = ex.toString();
logger.error("Polling request failed with an unknown error: {}", msg);
Expand All @@ -74,17 +78,30 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
msg,
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
// A null polling response indicates that we received a 304, which means nothing has changed.
if (pollingResponse == null) {
// If we get a 304, then that means nothing has changed.
if (pollingResponse.getStatusCode() == 304) {
return FDv2SourceResult.changeSet(
new DataStoreTypes.ChangeSet<>(DataStoreTypes.ChangeSetType.None,
Selector.EMPTY,
null,
// TODO: Implement environment ID support.
null
));
null // Header derived values will have been handled on initial response.
),
// Headers would have been processed from the initial response.
false);
}
if(!pollingResponse.isSuccess()) {
int statusCode = pollingResponse.getStatusCode();
boolean recoverable = statusCode <= 0 || isHttpErrorRecoverable(statusCode);
DataSourceStatusProvider.ErrorInfo errorInfo = DataSourceStatusProvider.ErrorInfo.fromHttpError(statusCode);
logger.error("Polling request failed with HTTP error: {}", statusCode);
// For a one-shot request all errors are terminal.
if (oneShot) {
return FDv2SourceResult.terminalError(errorInfo, fdv1Fallback);
} else {
return recoverable ? FDv2SourceResult.interrupted(errorInfo, fdv1Fallback) : FDv2SourceResult.terminalError(errorInfo, fdv1Fallback);
}
}
FDv2ProtocolHandler handler = new FDv2ProtocolHandler();
for (FDv2Event event : pollingResponse.getEvents()) {
Expand All @@ -96,10 +113,9 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> converted = FDv2ChangeSetTranslator.toChangeSet(
((FDv2ProtocolHandler.FDv2ActionChangeset) res).getChangeset(),
logger,
// TODO: Implement environment ID support.
null
environmentId
);
return FDv2SourceResult.changeSet(converted);
return FDv2SourceResult.changeSet(converted, fdv1Fallback);
} catch (Exception e) {
// TODO: Do we need to be more specific about the exception type here?
DataSourceStatusProvider.ErrorInfo info = new DataSourceStatusProvider.ErrorInfo(
Expand All @@ -108,7 +124,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
e.toString(),
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
case ERROR: {
FDv2ProtocolHandler.FDv2ActionError error = ((FDv2ProtocolHandler.FDv2ActionError) res);
Expand All @@ -117,10 +133,10 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
0,
error.getReason(),
new Date().toInstant());
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
case GOODBYE:
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason());
return FDv2SourceResult.goodbye(((FDv2ProtocolHandler.FDv2ActionGoodbye) res).getReason(), fdv1Fallback);
case NONE:
break;
case INTERNAL_ERROR: {
Expand All @@ -141,7 +157,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
0,
"Internal error occurred during polling",
new Date().toInstant());
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}
}
}
Expand All @@ -152,7 +168,7 @@ protected CompletableFuture<FDv2SourceResult> poll(Selector selector, boolean on
"Unexpected end of polling response",
new Date().toInstant()
);
return oneShot ? FDv2SourceResult.terminalError(info) : FDv2SourceResult.interrupted(info);
return oneShot ? FDv2SourceResult.terminalError(info, fdv1Fallback) : FDv2SourceResult.interrupted(info, fdv1Fallback);
}));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.logging.LDLogger;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.SelectorSource;
Expand Down
Loading