Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-sse-cleanup-on-retries-exhausted.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@solid-primitives/sse": patch
---

Fix memory leak when app-level retries are exhausted in `createSSE`. Previously, when all reconnect attempts were used up and the `EventSource` was permanently closed, `currentCleanup` was never called — leaving the `EventSource` instance and its event listeners alive in memory, and the `source` signal pointing to a stale handle. Now an `else if` branch explicitly calls `currentCleanup()`, clears the reference, and sets the `source` signal to `undefined`.
52 changes: 52 additions & 0 deletions packages/sse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,58 @@ SSEReadyState.CLOSED; // 2

`EventSource` has native browser-level reconnection built in. For transient network drops the browser automatically retries. The `reconnect` option in `createSSE` is for _application-level_ reconnection — it fires only when `readyState` becomes `SSEReadyState.CLOSED`, meaning the browser has given up entirely. You generally do not need `reconnect: true` for normal usage.

### A note on server disconnection detection

`EventSource` **does not reliably detect when a server silently stops responding**. If the server process crashes or the network path is severed without a proper TCP close handshake, the browser never fires an `error` event and `readyState` stays `OPEN` indefinitely — the connection looks healthy even though no messages will ever arrive.

The only robust workaround is **application-level heartbeats**: the server sends a lightweight event at a fixed interval, and the client starts a timer that triggers a reconnect if no heartbeat is received within the expected window.

```ts
import { createSSE } from "@solid-primitives/sse";
import { onCleanup } from "solid-js";

const HEARTBEAT_TIMEOUT_MS = 15_000; // reconnect if silent for 15 s

function createSSEWithHeartbeat(url: string) {
let timer: ReturnType<typeof setTimeout> | undefined;

const { reconnect, ...rest } = createSSE(url, {
// The server emits `event: heartbeat\ndata: \n\n` every ~10 s.
// Any regular message also resets the timer.
events: { heartbeat: resetTimer },
onMessage: resetTimer,
reconnect: true,
});

function resetTimer() {
clearTimeout(timer);
timer = setTimeout(() => {
// No heartbeat received — assume the server is gone.
reconnect();
}, HEARTBEAT_TIMEOUT_MS);
}

onCleanup(() => {
clearTimeout(timer);
timer = undefined;
});
resetTimer(); // arm the first timeout immediately

return { reconnect, ...rest };
}
```

On the server, emit a periodic heartbeat event well within the client timeout:

```js
// Express / Node.js example
setInterval(() => {
res.write("event: heartbeat\ndata: \n\n");
}, 10_000); // every 10 s, safely below the 15 s client timeout
```

> **Why SSE comment lines are not enough** — SSE comment lines (e.g. `: keep-alive`) reset the browser's internal TCP idle timer but are _not_ exposed to JavaScript listeners. Use a named `event: heartbeat` or a plain `data:` event if you need the client to observe the heartbeat.

## Integration with `@solid-primitives/event-bus`

Because `bus.emit` matches the `(event: MessageEvent) => void` shape of `onMessage`, you can wire them directly:
Expand Down
45 changes: 32 additions & 13 deletions packages/sse/src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ export type CreateSSEOptions<T> = SSEOptions & {
* - `true`: reconnect with defaults (Infinity retries, 3000ms delay)
* - object: custom `{ retries?, delay? }`
*
* Note: `EventSource` already reconnects natively for transient network
* drops. This option handles cases where the browser gives up entirely.
* The `retries` budget is shared across both browser-level retries
* (readyState stays CONNECTING) and app-level reconnects (readyState →
* CLOSED). Once the budget is exhausted the connection is fully torn down,
* stopping any further browser-driven retry loops.
*/
reconnect?: boolean | SSEReconnectOptions;
/**
Expand Down Expand Up @@ -227,6 +229,13 @@ export const createSSE = <T = string>(
// ── Connection management ─────────────────────────────────────────────────
let currentCleanup: VoidFunction | undefined;

/** Tears down the current source without scheduling a reconnect. */
const teardown = () => {
currentCleanup?.();
currentCleanup = undefined;
setSource(undefined);
};

/** Open a fresh connection, resetting the retry counter. */
const connect = (resolvedUrl: string) => {
retriesLeft = reconnectConfig.retries ?? 0;
Expand Down Expand Up @@ -255,11 +264,26 @@ export const createSSE = <T = string>(
setError(() => e);
options.onError?.(e);

// Only app-level reconnect when the browser has given up (CLOSED).
// When readyState is still CONNECTING the browser is handling retries.
// When the browser has given up (CLOSED), perform app-level reconnects
// against the configured budget.
// When the browser is still retrying (CONNECTING) and a reconnect budget
// is configured, count those attempts too so the config is always honoured
// and the browser can never loop infinitely beyond the configured limit.
if (es.readyState === SSEReadyState.CLOSED && retriesLeft > 0) {
retriesLeft--;
reconnectTimer = setTimeout(() => _open(resolvedUrl), reconnectConfig.delay ?? 3000);
} else if (es.readyState === SSEReadyState.CLOSED) {
// Retries exhausted — clean up fully to avoid memory/listener leaks.
teardown();
} else if (es.readyState === SSEReadyState.CONNECTING && options.reconnect) {
// Browser is retrying. Consume the budget; when it's gone, abort so
// we don't loop forever against the user's configured retry limit.
if (retriesLeft > 0) {
retriesLeft--;
} else {
teardown();
setReadyState(SSEReadyState.CLOSED);
}
}
};

Expand All @@ -280,9 +304,7 @@ export const createSSE = <T = string>(
const disconnect = () => {
clearReconnectTimer();
retriesLeft = 0;
currentCleanup?.();
currentCleanup = undefined;
setSource(undefined);
teardown();
setReadyState(SSEReadyState.CLOSED);
};

Expand All @@ -309,10 +331,7 @@ export const createSSE = <T = string>(
const resolvedUrl = url();
if (resolvedUrl !== prevUrl) {
prevUrl = resolvedUrl;
untrack(() => {
currentCleanup?.();
currentCleanup = undefined;
});
untrack(() => teardown());
connect(resolvedUrl);
}
});
Expand All @@ -321,8 +340,8 @@ export const createSSE = <T = string>(
// ── Lifecycle cleanup ─────────────────────────────────────────────────────
onCleanup(() => {
clearReconnectTimer();
currentCleanup?.();
currentCleanup = undefined;
teardown();
setReadyState(SSEReadyState.CLOSED);
});

return { source, data, error, readyState, close: disconnect, reconnect: manualReconnect };
Expand Down
82 changes: 77 additions & 5 deletions packages/sse/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ describe("createSSE", () => {
dispose();
}));

it("does not app-reconnect on transient errors (browser handles those)", () =>
it("does not open a new connection on transient errors (browser retries natively)", () =>
createRoot(dispose => {
const initialCount = SSEInstances.length;
const { source } = createSSE("https://example.com/events", {
Expand All @@ -153,7 +153,41 @@ describe("createSSE", () => {
vi.advanceTimersByTime(20);
(source() as unknown as MockEventSource).simulateTransientError();
vi.advanceTimersByTime(300);
// readyState stayed CONNECTING → no new EventSource was created
// readyState stayed CONNECTING → no new EventSource was created, but
// the retry budget was decremented by 1 (from 5 to 4).
expect(SSEInstances.length).toBe(initialCount + 1);
dispose();
}));

it("stops browser retry loop when reconnect budget is exhausted via transient errors", () =>
createRoot(dispose => {
const { source, readyState } = createSSE("https://example.com/events", {
reconnect: { retries: 2, delay: 50 },
});
vi.advanceTimersByTime(20);
const es = source() as unknown as MockEventSource;
const closeSpy = vi.spyOn(es, "close");
// Two transient errors consume the full budget (2→1→0).
es.simulateTransientError(); // retries: 2→1
es.simulateTransientError(); // retries: 1→0
// A third transient error exhausts the budget → connection must be stopped.
es.simulateTransientError();
expect(closeSpy).toHaveBeenCalledOnce();
expect(source()).toBeUndefined();
expect(readyState()).toBe(SSEReadyState.CLOSED);
dispose();
}));

it("does not affect transient errors when reconnect is not configured", () =>
createRoot(dispose => {
const initialCount = SSEInstances.length;
const { source } = createSSE("https://example.com/events");
vi.advanceTimersByTime(20);
const es = source() as unknown as MockEventSource;
// Transient errors with no reconnect config should not kill the connection.
es.simulateTransientError();
es.simulateTransientError();
expect(source()).toBe(es);
expect(SSEInstances.length).toBe(initialCount + 1);
dispose();
}));
Expand Down Expand Up @@ -181,12 +215,31 @@ describe("createSSE", () => {
const first = source();
(first as unknown as MockEventSource).simulateError();
vi.advanceTimersByTime(100); // first retry
const second = source();
const second = source() as unknown as MockEventSource;
expect(second).not.toBe(first);
vi.advanceTimersByTime(20); // second opens
(second as unknown as MockEventSource).simulateError();
const closeSpy = vi.spyOn(second, "close");
second.simulateError();
vi.advanceTimersByTime(200); // no more retries
expect(source()).toBe(second); // still the same source
// retries exhausted: close() was called and source signal is cleared
expect(closeSpy).toHaveBeenCalledOnce();
expect(source()).toBeUndefined();
dispose();
}));

it("cleans up source and listeners when retries are exhausted", () =>
createRoot(dispose => {
const { source, readyState } = createSSE("https://example.com/events", {
reconnect: { retries: 0, delay: 50 },
});
vi.advanceTimersByTime(20);
const es = source() as unknown as MockEventSource;
const closeSpy = vi.spyOn(es, "close");
es.simulateError();
// retries exhausted immediately — cleanup must have run
expect(closeSpy).toHaveBeenCalledOnce();
expect(source()).toBeUndefined();
expect(readyState()).toBe(SSEReadyState.CLOSED);
dispose();
}));

Expand Down Expand Up @@ -233,4 +286,23 @@ describe("createSSE", () => {
dispose();
}),
));

it("readyState is CLOSED after owner disposal", () =>
createRoot(dispose => {
const { readyState } = createSSE("https://example.com/events");
vi.advanceTimersByTime(20);
expect(readyState()).toBe(SSEReadyState.OPEN);
dispose();
expect(readyState()).toBe(SSEReadyState.CLOSED);
}));

it("readyState updates to CONNECTING when server drops connection", () =>
createRoot(dispose => {
const { readyState, source } = createSSE("https://example.com/events");
vi.advanceTimersByTime(20);
expect(readyState()).toBe(SSEReadyState.OPEN);
(source() as unknown as MockEventSource).simulateTransientError();
expect(readyState()).toBe(SSEReadyState.CONNECTING);
dispose();
}));
});
Loading