Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ High-level checklist for maintainers:

1. Bump the version in `src/main/resources/java-sdk-version/version.properties` and update `CHANGELOG.md`.
2. Merge the release branch into `main`.
3. Create a git tag for `main` that matches the new version
4. Publish a GitHub release (include the changelog). This triggers the release workflow.
3. Create a git tag for `main` that matches the new versions
4. Publish a GitHub release (include the changelog). This triggers the release workflow. (via the GitHub UI, `gh release creates v1.0.1 --title "v1.0.1" --notes-file <(cat CHANGELOG.md section)`)
5. Wait for Sonatype to sync the artifact (this can take a few hours).

The required signing keys and credentials are stored as GitHub secrets. If you need access or spot an issue with the release automation, please reach out to the Transloadit team via the issue tracker or support channels.

58 changes: 41 additions & 17 deletions src/main/java/com/transloadit/sdk/EventsourceRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.FaultEvent;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.ReadyState;
import com.launchdarkly.eventsource.RetryDelayStrategy;
import com.launchdarkly.eventsource.StartedEvent;
import com.launchdarkly.eventsource.StreamEvent;
Expand All @@ -28,6 +29,8 @@ class EventsourceRunnable implements Runnable {
protected EventSource eventSource;

protected Transloadit transloadit;
protected boolean stopRequested;
protected boolean assemblyFinishedNotified;

/**
* Constructor for {@link EventsourceRunnable}. It creates a new {@link EventSource} instance, wrapped in a
Expand Down Expand Up @@ -64,16 +67,21 @@ class EventsourceRunnable implements Runnable {
@Override
public void run() {
this.assemblyFinished = false;
this.stopRequested = false;
this.assemblyFinishedNotified = false;
try {
eventSource.start();
} catch (StreamException e) {
assemblyListener.onError(e);
stopRequested = true;
}

while (!assemblyFinished) {
while (!stopRequested) {
boolean processedEvent = false;
Iterable<StreamEvent> events = eventSource.anyEvents();
Iterator<StreamEvent> eventIterator = events.iterator();
if (eventIterator.hasNext()) {
while (eventIterator.hasNext()) {
processedEvent = true;
StreamEvent streamEvent = eventIterator.next();
if (streamEvent instanceof MessageEvent) {
handleMessageEvent((MessageEvent) streamEvent);
Expand All @@ -85,7 +93,24 @@ public void run() {
handleFaultEvent((FaultEvent) streamEvent);
}
}

if (!processedEvent) {
ReadyState state = eventSource.getState();
if (state == ReadyState.CLOSED || state == ReadyState.SHUTDOWN) {
stopRequested = true;
}
if (!stopRequested) {
try {
Thread.sleep(25);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
stopRequested = true;
}
}
}
}

shutdownEventSource();
}

/**
Expand All @@ -101,22 +126,22 @@ protected void handleMessageEvent(MessageEvent messageEvent) {
String eventName = messageEvent.getEventName();
String data = messageEvent.getData();

if (assemblyFinished) {
shutdownEventSource();
return;
}

// Check if the event is a message event without
if (eventName.equals("message")) {
switch (data) {
case "assembly_finished":
assemblyFinished = true;
try {
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
assemblyListener.onError(e);
} finally {
shutdownEventSource();
stopRequested = true;
if (!assemblyFinishedNotified) {
assemblyFinishedNotified = true;
try {
assemblyListener.onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
assemblyListener.onError(e);
}
}
if (eventSource != null) {
eventSource.stop();
}
break;
case "assembly_upload_meta_data_extracted":
Expand Down Expand Up @@ -146,10 +171,10 @@ protected void handleMessageEvent(MessageEvent messageEvent) {

case "assembly_error":
if (assemblyFinished) {
shutdownEventSource();
break;
}
assemblyListener.onError(new RequestException(data));
stopRequested = true;
shutdownEventSource();
break;

Expand Down Expand Up @@ -177,9 +202,8 @@ protected void handleStartedEvent(StartedEvent startedEvent) {
}

protected void handleFaultEvent(FaultEvent faultEvent) {
if (assemblyFinished) {
shutdownEventSource();
}
stopRequested = true;
shutdownEventSource();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Abort SSE on any fault

The new handleFaultEvent now unconditionally sets stopRequested and shuts down the EventSource whenever a FaultEvent arrives. Before this change, faults were ignored unless the assembly was already finished, allowing the LaunchDarkly client to reconnect transparently. With the new logic, any transient network hiccup before assembly_finished will stop the SSE loop permanently without notifying the listener, so the assembly listener may never receive either onAssemblyFinished or onError. This prematurely terminates monitoring of long‑running assemblies in unreliable network conditions.

Useful? React with 👍 / 👎.

// Debug output, uncomment if needed
// String data = faultEvent.toString();
// System.out.printf("Fault: %s\n", data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -51,6 +52,8 @@ void sseStreamShouldCloseWithoutErrorsAfterAssemblyFinished() throws Exception {
AtomicReference<AssemblyResponse> finishedResponse = new AtomicReference<>();
CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
CompletableFuture<Exception> errorFuture = new CompletableFuture<>();
CountDownLatch resultLatch = new CountDownLatch(1);
AtomicReference<JSONArray> resultPayload = new AtomicReference<>();

AssemblyListener listener = new AssemblyListener() {
@Override
Expand Down Expand Up @@ -97,6 +100,14 @@ public void onAssemblyProgress(JSONObject progress) {

@Override
public void onAssemblyResultFinished(JSONArray result) {
System.out.println("[AssemblySseIntegrationTest] assembly_result_finished payload=" + result);
if (result != null && result.length() >= 2) {
String stepName = result.optString(0, null);
if ("resize".equals(stepName)) {
resultPayload.compareAndSet(null, cloneJsonArray(result));
resultLatch.countDown();
}
}
}
};

Expand All @@ -120,6 +131,12 @@ public void onAssemblyResultFinished(JSONArray result) {
assertTrue(completed.isFinished(), "Assembly should be finished");
assertEquals("ASSEMBLY_COMPLETED", completed.json().optString("ok"));

boolean resultSeen = resultLatch.await(2, TimeUnit.MINUTES);
assertTrue(resultSeen, "Timed out waiting for assembly_result_finished event");
JSONArray resizePayload = resultPayload.get();
assertNotNull(resizePayload, "Resize SSE payload missing");
assertEquals("resize", resizePayload.optString(0));

try {
Exception unexpected = errorFuture.get(30, TimeUnit.SECONDS);
fail("Unexpected SSE error after completion: " + unexpected);
Expand All @@ -134,6 +151,10 @@ public void onAssemblyResultFinished(JSONArray result) {
}
}

private static JSONArray cloneJsonArray(JSONArray array) {
return array == null ? null : new JSONArray(array.toString());
}

private static Path createTempUpload() throws IOException {
Path file = Files.createTempFile("transloadit-sse-test", ".jpg");
URL source = new URL("https://demos.transloadit.com/inputs/chameleon.jpg");
Expand Down