diff --git a/.changeset/fix-sse-cleanup-on-retries-exhausted.md b/.changeset/fix-sse-cleanup-on-retries-exhausted.md new file mode 100644 index 000000000..d6f8bb86c --- /dev/null +++ b/.changeset/fix-sse-cleanup-on-retries-exhausted.md @@ -0,0 +1,5 @@ +--- +"@solid-primitives/sse": patch +--- + +Fix memory leak when app-level retries are exhausted in `createSSE`. Previously, when all reconnect attempts were used up and the `EventSource` was permanently closed, `currentCleanup` was never called — leaving the `EventSource` instance and its event listeners alive in memory, and the `source` signal pointing to a stale handle. Now an `else if` branch explicitly calls `currentCleanup()`, clears the reference, and sets the `source` signal to `undefined`. diff --git a/packages/sse/README.md b/packages/sse/README.md index 88fc99cc3..a534bf9fc 100644 --- a/packages/sse/README.md +++ b/packages/sse/README.md @@ -154,6 +154,58 @@ SSEReadyState.CLOSED; // 2 `EventSource` has native browser-level reconnection built in. For transient network drops the browser automatically retries. The `reconnect` option in `createSSE` is for _application-level_ reconnection — it fires only when `readyState` becomes `SSEReadyState.CLOSED`, meaning the browser has given up entirely. You generally do not need `reconnect: true` for normal usage. +### A note on server disconnection detection + +`EventSource` **does not reliably detect when a server silently stops responding**. If the server process crashes or the network path is severed without a proper TCP close handshake, the browser never fires an `error` event and `readyState` stays `OPEN` indefinitely — the connection looks healthy even though no messages will ever arrive. + +The only robust workaround is **application-level heartbeats**: the server sends a lightweight event at a fixed interval, and the client starts a timer that triggers a reconnect if no heartbeat is received within the expected window. + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { onCleanup } from "solid-js"; + +const HEARTBEAT_TIMEOUT_MS = 15_000; // reconnect if silent for 15 s + +function createSSEWithHeartbeat(url: string) { + let timer: ReturnType | undefined; + + const { reconnect, ...rest } = createSSE(url, { + // The server emits `event: heartbeat\ndata: \n\n` every ~10 s. + // Any regular message also resets the timer. + events: { heartbeat: resetTimer }, + onMessage: resetTimer, + reconnect: true, + }); + + function resetTimer() { + clearTimeout(timer); + timer = setTimeout(() => { + // No heartbeat received — assume the server is gone. + reconnect(); + }, HEARTBEAT_TIMEOUT_MS); + } + + onCleanup(() => { + clearTimeout(timer); + timer = undefined; + }); + resetTimer(); // arm the first timeout immediately + + return { reconnect, ...rest }; +} +``` + +On the server, emit a periodic heartbeat event well within the client timeout: + +```js +// Express / Node.js example +setInterval(() => { + res.write("event: heartbeat\ndata: \n\n"); +}, 10_000); // every 10 s, safely below the 15 s client timeout +``` + +> **Why SSE comment lines are not enough** — SSE comment lines (e.g. `: keep-alive`) reset the browser's internal TCP idle timer but are _not_ exposed to JavaScript listeners. Use a named `event: heartbeat` or a plain `data:` event if you need the client to observe the heartbeat. + ## Integration with `@solid-primitives/event-bus` Because `bus.emit` matches the `(event: MessageEvent) => void` shape of `onMessage`, you can wire them directly: diff --git a/packages/sse/src/sse.ts b/packages/sse/src/sse.ts index 12a626d63..cdad6c034 100644 --- a/packages/sse/src/sse.ts +++ b/packages/sse/src/sse.ts @@ -83,8 +83,10 @@ export type CreateSSEOptions = SSEOptions & { * - `true`: reconnect with defaults (Infinity retries, 3000ms delay) * - object: custom `{ retries?, delay? }` * - * Note: `EventSource` already reconnects natively for transient network - * drops. This option handles cases where the browser gives up entirely. + * The `retries` budget is shared across both browser-level retries + * (readyState stays CONNECTING) and app-level reconnects (readyState → + * CLOSED). Once the budget is exhausted the connection is fully torn down, + * stopping any further browser-driven retry loops. */ reconnect?: boolean | SSEReconnectOptions; /** @@ -227,6 +229,13 @@ export const createSSE = ( // ── Connection management ───────────────────────────────────────────────── let currentCleanup: VoidFunction | undefined; + /** Tears down the current source without scheduling a reconnect. */ + const teardown = () => { + currentCleanup?.(); + currentCleanup = undefined; + setSource(undefined); + }; + /** Open a fresh connection, resetting the retry counter. */ const connect = (resolvedUrl: string) => { retriesLeft = reconnectConfig.retries ?? 0; @@ -255,11 +264,26 @@ export const createSSE = ( setError(() => e); options.onError?.(e); - // Only app-level reconnect when the browser has given up (CLOSED). - // When readyState is still CONNECTING the browser is handling retries. + // When the browser has given up (CLOSED), perform app-level reconnects + // against the configured budget. + // When the browser is still retrying (CONNECTING) and a reconnect budget + // is configured, count those attempts too so the config is always honoured + // and the browser can never loop infinitely beyond the configured limit. if (es.readyState === SSEReadyState.CLOSED && retriesLeft > 0) { retriesLeft--; reconnectTimer = setTimeout(() => _open(resolvedUrl), reconnectConfig.delay ?? 3000); + } else if (es.readyState === SSEReadyState.CLOSED) { + // Retries exhausted — clean up fully to avoid memory/listener leaks. + teardown(); + } else if (es.readyState === SSEReadyState.CONNECTING && options.reconnect) { + // Browser is retrying. Consume the budget; when it's gone, abort so + // we don't loop forever against the user's configured retry limit. + if (retriesLeft > 0) { + retriesLeft--; + } else { + teardown(); + setReadyState(SSEReadyState.CLOSED); + } } }; @@ -280,9 +304,7 @@ export const createSSE = ( const disconnect = () => { clearReconnectTimer(); retriesLeft = 0; - currentCleanup?.(); - currentCleanup = undefined; - setSource(undefined); + teardown(); setReadyState(SSEReadyState.CLOSED); }; @@ -309,10 +331,7 @@ export const createSSE = ( const resolvedUrl = url(); if (resolvedUrl !== prevUrl) { prevUrl = resolvedUrl; - untrack(() => { - currentCleanup?.(); - currentCleanup = undefined; - }); + untrack(() => teardown()); connect(resolvedUrl); } }); @@ -321,8 +340,8 @@ export const createSSE = ( // ── Lifecycle cleanup ───────────────────────────────────────────────────── onCleanup(() => { clearReconnectTimer(); - currentCleanup?.(); - currentCleanup = undefined; + teardown(); + setReadyState(SSEReadyState.CLOSED); }); return { source, data, error, readyState, close: disconnect, reconnect: manualReconnect }; diff --git a/packages/sse/test/index.test.ts b/packages/sse/test/index.test.ts index 7cd09d3bb..b5765184a 100644 --- a/packages/sse/test/index.test.ts +++ b/packages/sse/test/index.test.ts @@ -144,7 +144,7 @@ describe("createSSE", () => { dispose(); })); - it("does not app-reconnect on transient errors (browser handles those)", () => + it("does not open a new connection on transient errors (browser retries natively)", () => createRoot(dispose => { const initialCount = SSEInstances.length; const { source } = createSSE("https://example.com/events", { @@ -153,7 +153,41 @@ describe("createSSE", () => { vi.advanceTimersByTime(20); (source() as unknown as MockEventSource).simulateTransientError(); vi.advanceTimersByTime(300); - // readyState stayed CONNECTING → no new EventSource was created + // readyState stayed CONNECTING → no new EventSource was created, but + // the retry budget was decremented by 1 (from 5 to 4). + expect(SSEInstances.length).toBe(initialCount + 1); + dispose(); + })); + + it("stops browser retry loop when reconnect budget is exhausted via transient errors", () => + createRoot(dispose => { + const { source, readyState } = createSSE("https://example.com/events", { + reconnect: { retries: 2, delay: 50 }, + }); + vi.advanceTimersByTime(20); + const es = source() as unknown as MockEventSource; + const closeSpy = vi.spyOn(es, "close"); + // Two transient errors consume the full budget (2→1→0). + es.simulateTransientError(); // retries: 2→1 + es.simulateTransientError(); // retries: 1→0 + // A third transient error exhausts the budget → connection must be stopped. + es.simulateTransientError(); + expect(closeSpy).toHaveBeenCalledOnce(); + expect(source()).toBeUndefined(); + expect(readyState()).toBe(SSEReadyState.CLOSED); + dispose(); + })); + + it("does not affect transient errors when reconnect is not configured", () => + createRoot(dispose => { + const initialCount = SSEInstances.length; + const { source } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + const es = source() as unknown as MockEventSource; + // Transient errors with no reconnect config should not kill the connection. + es.simulateTransientError(); + es.simulateTransientError(); + expect(source()).toBe(es); expect(SSEInstances.length).toBe(initialCount + 1); dispose(); })); @@ -181,12 +215,31 @@ describe("createSSE", () => { const first = source(); (first as unknown as MockEventSource).simulateError(); vi.advanceTimersByTime(100); // first retry - const second = source(); + const second = source() as unknown as MockEventSource; expect(second).not.toBe(first); vi.advanceTimersByTime(20); // second opens - (second as unknown as MockEventSource).simulateError(); + const closeSpy = vi.spyOn(second, "close"); + second.simulateError(); vi.advanceTimersByTime(200); // no more retries - expect(source()).toBe(second); // still the same source + // retries exhausted: close() was called and source signal is cleared + expect(closeSpy).toHaveBeenCalledOnce(); + expect(source()).toBeUndefined(); + dispose(); + })); + + it("cleans up source and listeners when retries are exhausted", () => + createRoot(dispose => { + const { source, readyState } = createSSE("https://example.com/events", { + reconnect: { retries: 0, delay: 50 }, + }); + vi.advanceTimersByTime(20); + const es = source() as unknown as MockEventSource; + const closeSpy = vi.spyOn(es, "close"); + es.simulateError(); + // retries exhausted immediately — cleanup must have run + expect(closeSpy).toHaveBeenCalledOnce(); + expect(source()).toBeUndefined(); + expect(readyState()).toBe(SSEReadyState.CLOSED); dispose(); })); @@ -233,4 +286,23 @@ describe("createSSE", () => { dispose(); }), )); + + it("readyState is CLOSED after owner disposal", () => + createRoot(dispose => { + const { readyState } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + expect(readyState()).toBe(SSEReadyState.OPEN); + dispose(); + expect(readyState()).toBe(SSEReadyState.CLOSED); + })); + + it("readyState updates to CONNECTING when server drops connection", () => + createRoot(dispose => { + const { readyState, source } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + expect(readyState()).toBe(SSEReadyState.OPEN); + (source() as unknown as MockEventSource).simulateTransientError(); + expect(readyState()).toBe(SSEReadyState.CONNECTING); + dispose(); + })); });