diff --git a/packages/extension/test/e2e/remote-comms.test.ts b/packages/extension/test/e2e/remote-comms.test.ts index 23bb442f7a..1adac48f40 100644 --- a/packages/extension/test/e2e/remote-comms.test.ts +++ b/packages/extension/test/e2e/remote-comms.test.ts @@ -5,7 +5,7 @@ import { rm } from 'node:fs/promises'; import { loadExtension, sessionPath } from '../helpers.ts'; -test.describe.configure({ mode: 'serial', timeout: 60_000 }); +test.describe.configure({ mode: 'serial', timeout: 90_000 }); /** * End-to-end tests for remote communications functionality. @@ -140,7 +140,11 @@ test.describe('Remote Communications', () => { const messageResponse = popupPage1.locator( '[data-testid="message-response"]', ); - await expect(messageResponse).toBeVisible({ timeout: 30_000 }); + // Budget: redemption timeout is `ackTimeoutMs * (MAX_RETRIES + 1)` = + // 40s with prod defaults; the response (success or rejection) is + // guaranteed to render before then. 50s gives 10s headroom for the + // post-redemption render path under CI load. + await expect(messageResponse).toBeVisible({ timeout: 50_000 }); await expect(messageResponse).toContainText( // eslint-disable-next-line no-useless-escape `Response:{\"body\":\"#\\\"vat Bob got \\\\\\\"hello\\\\\\\" from remote Alice\\\"\",\"slots\":[]}`, diff --git a/packages/kernel-browser-runtime/CHANGELOG.md b/packages/kernel-browser-runtime/CHANGELOG.md index 1f6f23b38d..b63734215d 100644 --- a/packages/kernel-browser-runtime/CHANGELOG.md +++ b/packages/kernel-browser-runtime/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Process platform-services RPC request handlers in the background so a request handler that fires a reentrant outbound RPC (e.g. transport handshake calling back into the kernel) cannot deadlock waiting for its response ([#948](https://github.com/MetaMask/ocap-kernel/pull/948)) + ## [0.6.0] ### Added diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts index 5673a8ccd1..aba8f598ef 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts @@ -513,6 +513,197 @@ describe('PlatformServicesClient', () => { expect(successResponse).toBeDefined(); }); }); + + describe('remoteIncarnationChange', () => { + it('forwards args to handler and returns its boolean verdict', async () => { + const outputs: MessageEventWithPayload[] = []; + const testStream = await TestDuplexStream.make((message) => { + outputs.push(message as unknown as MessageEventWithPayload); + }); + const testClient = new PlatformServicesClient( + testStream as unknown as PlatformServicesClientStream, + clientLogger, + ); + await delay(10); + + const remoteHandler = vi.fn(async () => 'response'); + const giveUpHandler = vi.fn(); + const incarnationHandler = vi.fn(async () => true); + const initP = testClient.initializeRemoteComms( + '0xabcd', + {}, + remoteHandler, + giveUpHandler, + 'local-incarnation', + incarnationHandler, + ); + await testStream.receiveInput(makeNullReply('m1')); + await initP; + + await testStream.receiveInput( + new MessageEvent('message', { + data: { + id: 'm2', + jsonrpc: '2.0', + method: 'remoteIncarnationChange', + params: { + peerId: 'peer-456', + observedIncarnation: 'incarnation-X', + }, + }, + }), + ); + await delay(50); + + expect(incarnationHandler).toHaveBeenCalledExactlyOnceWith( + 'peer-456', + 'incarnation-X', + ); + const successResponse = outputs.find( + (message) => + message.payload?.id === 'm2' && + 'result' in message.payload && + message.payload.result === true, + ); + expect(successResponse).toBeDefined(); + }); + + it('returns false when no handler is registered', async () => { + const outputs: MessageEventWithPayload[] = []; + const newStream = await TestDuplexStream.make((message) => { + outputs.push(message as unknown as MessageEventWithPayload); + }); + // eslint-disable-next-line no-new -- test setup + new PlatformServicesClient( + newStream as unknown as PlatformServicesClientStream, + ); + + await newStream.receiveInput( + new MessageEvent('message', { + data: { + id: 'm1', + jsonrpc: '2.0', + method: 'remoteIncarnationChange', + params: { + peerId: 'peer-789', + observedIncarnation: 'incarnation-Y', + }, + }, + }), + ); + await delay(10); + + const response = outputs.find( + (message) => + message.payload?.id === 'm1' && + 'result' in message.payload && + message.payload.result === false, + ); + expect(response).toBeDefined(); + }); + + it('coerces non-boolean handler return to true (fail closed)', async () => { + const outputs: MessageEventWithPayload[] = []; + const testStream = await TestDuplexStream.make((message) => { + outputs.push(message as unknown as MessageEventWithPayload); + }); + const testClient = new PlatformServicesClient( + testStream as unknown as PlatformServicesClientStream, + clientLogger, + ); + await delay(10); + + const remoteHandler = vi.fn(async () => 'response'); + // Handler returns a non-boolean truthy value (a buggy caller). + const incarnationHandler = vi.fn( + async () => 'oops' as unknown as boolean, + ); + const initP = testClient.initializeRemoteComms( + '0xabcd', + {}, + remoteHandler, + undefined, + 'local-incarnation', + incarnationHandler, + ); + await testStream.receiveInput(makeNullReply('m1')); + await initP; + + await testStream.receiveInput( + new MessageEvent('message', { + data: { + id: 'm2', + jsonrpc: '2.0', + method: 'remoteIncarnationChange', + params: { + peerId: 'peer-789', + observedIncarnation: 'incarnation-Z', + }, + }, + }), + ); + await delay(50); + + // Fail closed → resolve to true so the transport drops the outbound. + const response = outputs.find( + (message) => + message.payload?.id === 'm2' && + 'result' in message.payload && + message.payload.result === true, + ); + expect(response).toBeDefined(); + }); + + it('coerces a throwing handler to true (fail closed)', async () => { + const outputs: MessageEventWithPayload[] = []; + const testStream = await TestDuplexStream.make((message) => { + outputs.push(message as unknown as MessageEventWithPayload); + }); + const testClient = new PlatformServicesClient( + testStream as unknown as PlatformServicesClientStream, + clientLogger, + ); + await delay(10); + + const remoteHandler = vi.fn(async () => 'response'); + const incarnationHandler = vi.fn(async () => { + throw new Error('handler exploded'); + }); + const initP = testClient.initializeRemoteComms( + '0xabcd', + {}, + remoteHandler, + undefined, + 'local-incarnation', + incarnationHandler, + ); + await testStream.receiveInput(makeNullReply('m1')); + await initP; + + await testStream.receiveInput( + new MessageEvent('message', { + data: { + id: 'm2', + jsonrpc: '2.0', + method: 'remoteIncarnationChange', + params: { + peerId: 'peer-789', + observedIncarnation: 'incarnation-Z', + }, + }, + }), + ); + await delay(50); + + const response = outputs.find( + (message) => + message.payload?.id === 'm2' && + 'result' in message.payload && + message.payload.result === true, + ); + expect(response).toBeDefined(); + }); + }); }); }); }); diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts index c56dee697c..41461b8825 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.ts @@ -25,6 +25,7 @@ import type { PostMessageTarget, } from '@metamask/streams/browser'; import { isJsonRpcResponse, isJsonRpcRequest } from '@metamask/utils'; +import type { JsonRpcRequest } from '@metamask/utils'; import type { JsonRpcId } from '@metamask/utils'; // Appears in the docs. @@ -329,16 +330,48 @@ export class PlatformServicesClient implements PlatformServices { } /** - * Handle a remote incarnation change notification from the server. + * Forward an incarnationId observed during a peer handshake to the kernel + * layer. Fires on every successful handshake; the kernel decides whether + * it represents a peer restart and returns the verdict so the transport + * can suppress stale outbound messages. * - * @param peerId - The peer ID of the remote that restarted. - * @returns A promise that resolves when handling is complete. + * Fails closed: handler exceptions and non-boolean returns coerce to + * `true` so the transport drops the outbound rather than letting a + * potentially stale message through. Returns `false` only when no handler + * is registered (transport has no kernel to consult, so the verdict is + * effectively unknown — the receiver-side persisted check still gates + * correctness). + * + * @param peerId - The peer that completed the handshake. + * @param observedIncarnation - The incarnationId reported by the peer. + * @returns Whether the kernel detected a peer restart. */ - async #remoteIncarnationChange(peerId: string): Promise { - if (this.#remoteIncarnationChangeHandler) { - this.#remoteIncarnationChangeHandler(peerId); + async #remoteIncarnationChange( + peerId: string, + observedIncarnation: string, + ): Promise { + if (!this.#remoteIncarnationChangeHandler) { + return false; + } + try { + const verdict = await this.#remoteIncarnationChangeHandler( + peerId, + observedIncarnation, + ); + if (typeof verdict !== 'boolean') { + this.#logger.error( + `incarnation handler returned non-boolean ${typeof verdict} for ${peerId.slice(0, 8)}; treating as restart`, + ); + return true; + } + return verdict; + } catch (error) { + this.#logger.error( + `incarnation handler threw for ${peerId.slice(0, 8)}; treating as restart:`, + error, + ); + return true; } - return null; } /** @@ -383,22 +416,34 @@ export class PlatformServicesClient implements PlatformServices { this.#rpcClient.handleResponse(id, event.data); } else if (isJsonRpcRequest(event.data)) { - const { id, method, params } = event.data; - try { - this.#rpcServer.assertHasMethod(method); - const result = await this.#rpcServer.execute(method, params); - await this.#sendMessage({ - id, - result, - jsonrpc: '2.0', - }); - } catch (error) { - await this.#sendMessage({ - id, - error: serializeError(error), - jsonrpc: '2.0', - }); - } + // Run the request handler in the background instead of awaiting it + // inside the drain. The drain processes responses too, and a request + // handler that fires an outbound RPC back to the other side would + // deadlock waiting for its response — the drain can't get to that + // response until the request handler returns. + this.#executeRequest(event.data).catch(() => undefined); + } + } + + /** + * Execute a JSON-RPC request and write the response back. Errors during + * execution are serialized into a JSON-RPC error response; errors during + * response delivery are swallowed. + * + * @param request - The JSON-RPC request to execute. + */ + async #executeRequest(request: JsonRpcRequest): Promise { + const { id, method, params } = request; + try { + this.#rpcServer.assertHasMethod(method); + const result = await this.#rpcServer.execute(method, params); + await this.#sendMessage({ id, result, jsonrpc: '2.0' }); + } catch (error) { + await this.#sendMessage({ + id, + error: serializeError(error), + jsonrpc: '2.0', + }).catch(() => undefined); } } } diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index abe83cf051..9a4fc7c0cc 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -28,6 +28,9 @@ let capturedRemoteMessageHandler: | ((from: string, message: string) => Promise) | undefined; let capturedRemoteGiveUpHandler: ((peerId: string) => void) | undefined; +let capturedOnIncarnationChange: + | ((peerId: string, observedIncarnation: string) => Promise) + | undefined; vi.mock('@metamask/ocap-kernel', () => ({ PlatformServicesCommandMethod: { @@ -41,9 +44,14 @@ vi.mock('@metamask/ocap-kernel', () => ({ _options: unknown, remoteMessageHandler: (from: string, message: string) => Promise, remoteGiveUpHandler: (peerId: string) => void, + _localIncarnationId: string | undefined, + onIncarnationChange: + | ((peerId: string, observedIncarnation: string) => Promise) + | undefined, ) => { capturedRemoteMessageHandler = remoteMessageHandler; capturedRemoteGiveUpHandler = remoteGiveUpHandler; + capturedOnIncarnationChange = onIncarnationChange; return { sendRemoteMessage: mockSendRemoteMessage, stop: mockStop, @@ -389,6 +397,7 @@ describe('PlatformServicesServer', () => { // Reset mocks before each test capturedRemoteMessageHandler = undefined; capturedRemoteGiveUpHandler = undefined; + capturedOnIncarnationChange = undefined; }); describe('initializeRemoteComms', () => { @@ -533,6 +542,103 @@ describe('PlatformServicesServer', () => { }); }); + describe('handleRemoteIncarnationChange', () => { + it('forwards observed incarnation to RPC and resolves to the kernel verdict', async () => { + const keySeed = '0xabcd'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + + const outputs: unknown[] = []; + const testStream = await TestDuplexStream.make((message) => { + outputs.push(message); + }); + await testStream.synchronize(); + // eslint-disable-next-line no-new + new PlatformServicesServer( + testStream as unknown as PlatformServicesStream, + makeMockVatWorker, + logger, + ); + await testStream.receiveInput( + makeInitializeRemoteCommsMessageEvent('m0', keySeed, { relays }), + ); + await delay(10); + + expect(capturedOnIncarnationChange).toBeDefined(); + + // Fire the handler and have the "kernel" respond with `true`. + const verdict = capturedOnIncarnationChange?.( + 'peer-789', + 'incarnation-X', + ); + await delay(10); + + // Find the outgoing RPC and respond. + const rpcCall = outputs.find((message: unknown) => { + const parsed = message as { payload?: { method?: string } }; + return parsed.payload?.method === 'remoteIncarnationChange'; + }) as { payload: { method: string; id: string; params: unknown } }; + expect(rpcCall).toBeDefined(); + expect(rpcCall.payload.params).toStrictEqual({ + peerId: 'peer-789', + observedIncarnation: 'incarnation-X', + }); + + // Stub the RPC response with verdict=true. + await testStream.receiveInput( + new MessageEvent('message', { + data: { id: rpcCall.payload.id, result: true, jsonrpc: '2.0' }, + }), + ); + expect(await verdict).toBe(true); + }); + + it('returns true (fail closed) when the RPC call rejects', async () => { + const keySeed = '0xabcd'; + const relays = ['/dns4/relay.example/tcp/443/wss/p2p/relayPeer']; + + const outputs: unknown[] = []; + const testStream = await TestDuplexStream.make((message) => { + outputs.push(message); + }); + await testStream.synchronize(); + // eslint-disable-next-line no-new + new PlatformServicesServer( + testStream as unknown as PlatformServicesStream, + makeMockVatWorker, + logger, + ); + await testStream.receiveInput( + makeInitializeRemoteCommsMessageEvent('m0', keySeed, { relays }), + ); + await delay(10); + + const verdict = capturedOnIncarnationChange?.( + 'peer-789', + 'incarnation-Y', + ); + await delay(10); + + // Reject the RPC. + const rpcCall = outputs.find((message: unknown) => { + const parsed = message as { payload?: { method?: string } }; + return parsed.payload?.method === 'remoteIncarnationChange'; + }) as { payload: { method: string; id: string } }; + expect(rpcCall).toBeDefined(); + await testStream.receiveInput( + new MessageEvent('message', { + data: { + id: rpcCall.payload.id, + error: { code: -32000, message: 'kernel unreachable' }, + jsonrpc: '2.0', + }, + }), + ); + + // Fail closed → resolve to true so transport drops the outbound. + expect(await verdict).toBe(true); + }); + }); + describe('handleRemoteGiveUp', () => { it('captures handler from initTransport', async () => { const keySeed = '0xabcd'; diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts index 40b2c3a7a4..d176cc7e14 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.ts @@ -25,6 +25,7 @@ import type { PostMessageTarget, } from '@metamask/streams/browser'; import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils'; +import type { JsonRpcRequest } from '@metamask/utils'; // Appears in the docs. // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -179,23 +180,41 @@ export class PlatformServicesServer { const message = event.data; this.#rpcClient.handleResponse(message.id as string, message); } else if (isJsonRpcRequest(event.data)) { - const { id, method, params } = event.data; - try { - this.#rpcServer.assertHasMethod(method); - // Ridiculous cast to bypass TypeScript vs. JsonRpc tug-o-war - const port: MessagePort | undefined = (await this.#rpcServer.execute( - method, - params, - )) as unknown as MessagePort | undefined; - await this.#sendMessage({ id, result: null, jsonrpc: '2.0' }, port); - } catch (error) { - this.#logger.error(`Error handling "${method}" request:`, error); - this.#sendMessage({ - id, - error: serializeError(error), - jsonrpc: '2.0', - }).catch(() => undefined); - } + // Run the request handler in the background instead of awaiting it + // inside the drain. The drain processes responses too, and a request + // handler that fires an outbound RPC back to the other side (e.g. + // transport.sendRemoteMessage's handshake calling onIncarnationChange) + // would deadlock waiting for its response — the drain can't get to + // that response until the request handler returns. + this.#executeRequest(event.data).catch(() => undefined); + } + } + + /** + * Execute a JSON-RPC request and write the response back. Errors during + * execution are serialized into a JSON-RPC error response; errors during + * response delivery are logged and swallowed (the caller has nowhere to + * surface them). + * + * @param request - The JSON-RPC request to execute. + */ + async #executeRequest(request: JsonRpcRequest): Promise { + const { id, method, params } = request; + try { + this.#rpcServer.assertHasMethod(method); + // Ridiculous cast to bypass TypeScript vs. JsonRpc tug-o-war + const port: MessagePort | undefined = (await this.#rpcServer.execute( + method, + params, + )) as unknown as MessagePort | undefined; + await this.#sendMessage({ id, result: null, jsonrpc: '2.0' }, port); + } catch (error) { + this.#logger.error(`Error handling "${method}" request:`, error); + this.#sendMessage({ + id, + error: serializeError(error), + jsonrpc: '2.0', + }).catch(() => undefined); } } @@ -429,20 +448,40 @@ export class PlatformServicesServer { } /** - * Handle when a remote peer's incarnation changes (peer restarted). - * Notifies the kernel worker via RPC to reset the RemoteHandle state. + * Forward the incarnationId observed during a peer handshake to the kernel + * worker, and return its determination of whether the peer truly restarted. + * The transport awaits this so it can suppress stale outbound messages on + * the same connection. * - * @param peerId - The peer ID of the remote that restarted. + * On RPC failure (kernel worker unreachable, channel torn down, validation + * error) we fail *closed* — return `true` to make the transport drop the + * outbound and re-dial. The opposite default (`false` = "no restart") would + * silently let stale writes through exactly when the RPC is most likely to + * fail (kernel-side restart), which is the bug class this whole change is + * defending against. + * + * @param peerId - The peer that completed the handshake. + * @param observedIncarnation - The incarnationId reported by the peer. + * @returns Whether the kernel detected a peer restart, or `true` when the + * kernel-side check could not be performed. */ - #handleRemoteIncarnationChange(peerId: string): void { - this.#rpcClient - .call('remoteIncarnationChange', { peerId }) - .catch((error) => { - this.#logger.error( - 'Error notifying kernel of remote incarnation change:', - error, - ); + async #handleRemoteIncarnationChange( + peerId: string, + observedIncarnation: string, + ): Promise { + try { + return await this.#rpcClient.call('remoteIncarnationChange', { + peerId, + observedIncarnation, }); + } catch (error) { + this.#logger.error( + `Cannot reach kernel for incarnation handshake with ${peerId.slice(0, 8)}; ` + + 'treating as restart to avoid stale delivery:', + error, + ); + return true; + } } } harden(PlatformServicesServer); diff --git a/packages/kernel-errors/CHANGELOG.md b/packages/kernel-errors/CHANGELOG.md index c4dd8f1d6c..0aadd36520 100644 --- a/packages/kernel-errors/CHANGELOG.md +++ b/packages/kernel-errors/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `PeerRestartedError`, `IntentionalCloseError`, and `NetworkStoppedError` sentinel errors for the remote-comms transport ([#948](https://github.com/MetaMask/ocap-kernel/pull/948)) +- Add `isTerminalSendError` utility to discriminate retry-worthy from terminal `sendRemoteMessage` errors ([#948](https://github.com/MetaMask/ocap-kernel/pull/948)) + ## [0.6.0] ### Added diff --git a/packages/kernel-errors/src/constants.ts b/packages/kernel-errors/src/constants.ts index fc5c55f8d9..e611f814f5 100644 --- a/packages/kernel-errors/src/constants.ts +++ b/packages/kernel-errors/src/constants.ts @@ -34,6 +34,9 @@ export const ErrorCode = { SampleGenerationError: 'SAMPLE_GENERATION_ERROR', InternalError: 'INTERNAL_ERROR', ResourceLimitError: 'RESOURCE_LIMIT_ERROR', + PeerRestartedError: 'PEER_RESTARTED_ERROR', + IntentionalCloseError: 'INTENTIONAL_CLOSE_ERROR', + NetworkStoppedError: 'NETWORK_STOPPED_ERROR', } as const; export type ErrorCode = (typeof ErrorCode)[keyof typeof ErrorCode]; diff --git a/packages/kernel-errors/src/errors/IntentionalCloseError.test.ts b/packages/kernel-errors/src/errors/IntentionalCloseError.test.ts new file mode 100644 index 0000000000..51ace99970 --- /dev/null +++ b/packages/kernel-errors/src/errors/IntentionalCloseError.test.ts @@ -0,0 +1,94 @@ +import { describe, it, expect } from 'vitest'; + +import { IntentionalCloseError } from './IntentionalCloseError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('IntentionalCloseError', () => { + const expectedMessage = 'Message delivery failed after intentional close'; + + it('creates an IntentionalCloseError with the canonical message and code', () => { + const error = new IntentionalCloseError(); + expect(error).toBeInstanceOf(IntentionalCloseError); + expect(error.code).toBe(ErrorCode.IntentionalCloseError); + expect(error.message).toBe(expectedMessage); + expect(error.data).toBeUndefined(); + }); + + it('exposes the canonical name across the RPC boundary', () => { + expect(new IntentionalCloseError().name).toBe('IntentionalCloseError'); + }); + + it('accepts a cause', () => { + const cause = new Error('User explicitly disconnected'); + const error = new IntentionalCloseError({ cause }); + expect(error.cause).toBe(cause); + }); + + it('accepts a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new IntentionalCloseError({ stack: customStack }); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled IntentionalCloseError', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: expectedMessage, + stack: 'customStack', + code: ErrorCode.IntentionalCloseError, + } as unknown as MarshaledOcapError; + + const unmarshaled = IntentionalCloseError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaled).toBeInstanceOf(IntentionalCloseError); + expect(unmarshaled.code).toBe(ErrorCode.IntentionalCloseError); + expect(unmarshaled.message).toBe(expectedMessage); + expect(unmarshaled.stack).toBe('customStack'); + }); + + it.each([ + { + name: 'invalid data field', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: ErrorCode.IntentionalCloseError, + data: { unexpected: 'field' }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data -- Expected a value of type `never`, but received: `[object Object]`', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"INTENTIONAL_CLOSE_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"INTENTIONAL_CLOSE_ERROR"`, but received: undefined', + }, + ])( + 'throws when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + IntentionalCloseError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/IntentionalCloseError.ts b/packages/kernel-errors/src/errors/IntentionalCloseError.ts new file mode 100644 index 0000000000..0b3a7ddbed --- /dev/null +++ b/packages/kernel-errors/src/errors/IntentionalCloseError.ts @@ -0,0 +1,61 @@ +import { + assert, + literal, + never, + object, + optional, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +/** + * Sentinel error thrown by `sendRemoteMessage` when the local side has + * intentionally closed the connection to a peer. Further messages on this + * peer must not be retried until reconnectPeer is called. + */ +export class IntentionalCloseError extends BaseError { + /** + * Creates a new IntentionalCloseError. + * + * @param options - Additional error options including cause and stack. + * @param options.cause - The underlying error that caused the close. + * @param options.stack - The stack trace of the error. + */ + constructor(options?: ErrorOptionsWithStack) { + super( + ErrorCode.IntentionalCloseError, + 'Message delivery failed after intentional close', + { ...options }, + ); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link IntentionalCloseError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.IntentionalCloseError), + data: optional(never()), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link IntentionalCloseError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): IntentionalCloseError { + assert(marshaledError, this.struct); + return new IntentionalCloseError(unmarshalErrorOptions(marshaledError)); + } +} +harden(IntentionalCloseError); diff --git a/packages/kernel-errors/src/errors/NetworkStoppedError.test.ts b/packages/kernel-errors/src/errors/NetworkStoppedError.test.ts new file mode 100644 index 0000000000..43a2792659 --- /dev/null +++ b/packages/kernel-errors/src/errors/NetworkStoppedError.test.ts @@ -0,0 +1,94 @@ +import { describe, it, expect } from 'vitest'; + +import { NetworkStoppedError } from './NetworkStoppedError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('NetworkStoppedError', () => { + const expectedMessage = 'Network stopped'; + + it('creates a NetworkStoppedError with the canonical message and code', () => { + const error = new NetworkStoppedError(); + expect(error).toBeInstanceOf(NetworkStoppedError); + expect(error.code).toBe(ErrorCode.NetworkStoppedError); + expect(error.message).toBe(expectedMessage); + expect(error.data).toBeUndefined(); + }); + + it('exposes the canonical name across the RPC boundary', () => { + expect(new NetworkStoppedError().name).toBe('NetworkStoppedError'); + }); + + it('accepts a cause', () => { + const cause = new Error('AbortController fired during shutdown'); + const error = new NetworkStoppedError({ cause }); + expect(error.cause).toBe(cause); + }); + + it('accepts a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new NetworkStoppedError({ stack: customStack }); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled NetworkStoppedError', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: expectedMessage, + stack: 'customStack', + code: ErrorCode.NetworkStoppedError, + } as unknown as MarshaledOcapError; + + const unmarshaled = NetworkStoppedError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaled).toBeInstanceOf(NetworkStoppedError); + expect(unmarshaled.code).toBe(ErrorCode.NetworkStoppedError); + expect(unmarshaled.message).toBe(expectedMessage); + expect(unmarshaled.stack).toBe('customStack'); + }); + + it.each([ + { + name: 'invalid data field', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: ErrorCode.NetworkStoppedError, + data: { unexpected: 'field' }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data -- Expected a value of type `never`, but received: `[object Object]`', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"NETWORK_STOPPED_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"NETWORK_STOPPED_ERROR"`, but received: undefined', + }, + ])( + 'throws when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + NetworkStoppedError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/NetworkStoppedError.ts b/packages/kernel-errors/src/errors/NetworkStoppedError.ts new file mode 100644 index 0000000000..51c9b395cd --- /dev/null +++ b/packages/kernel-errors/src/errors/NetworkStoppedError.ts @@ -0,0 +1,57 @@ +import { + assert, + literal, + never, + object, + optional, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +/** + * Sentinel error thrown by `sendRemoteMessage` when the transport has been + * stopped (kernel shutdown). No further sends will succeed; recipients + * should drain pending state instead of retrying. + */ +export class NetworkStoppedError extends BaseError { + /** + * Creates a new NetworkStoppedError. + * + * @param options - Additional error options including cause and stack. + * @param options.cause - The underlying error that caused the network to stop. + * @param options.stack - The stack trace of the error. + */ + constructor(options?: ErrorOptionsWithStack) { + super(ErrorCode.NetworkStoppedError, 'Network stopped', { ...options }); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link NetworkStoppedError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.NetworkStoppedError), + data: optional(never()), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link NetworkStoppedError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): NetworkStoppedError { + assert(marshaledError, this.struct); + return new NetworkStoppedError(unmarshalErrorOptions(marshaledError)); + } +} +harden(NetworkStoppedError); diff --git a/packages/kernel-errors/src/errors/PeerRestartedError.test.ts b/packages/kernel-errors/src/errors/PeerRestartedError.test.ts new file mode 100644 index 0000000000..c01e391d18 --- /dev/null +++ b/packages/kernel-errors/src/errors/PeerRestartedError.test.ts @@ -0,0 +1,97 @@ +import { describe, it, expect } from 'vitest'; + +import { PeerRestartedError } from './PeerRestartedError.ts'; +import { ErrorCode, ErrorSentinel } from '../constants.ts'; +import { unmarshalErrorOptions } from '../marshal/unmarshalError.ts'; +import type { MarshaledOcapError } from '../types.ts'; + +describe('PeerRestartedError', () => { + const expectedMessage = + 'Remote peer restarted: message not sent to avoid stale delivery'; + + it('creates a PeerRestartedError with the canonical message and code', () => { + const error = new PeerRestartedError(); + expect(error).toBeInstanceOf(PeerRestartedError); + expect(error.code).toBe(ErrorCode.PeerRestartedError); + expect(error.message).toBe(expectedMessage); + expect(error.data).toBeUndefined(); + }); + + it('exposes the canonical name across the RPC boundary', () => { + // The transport-side predicate (`isTerminalSendError`) matches by `name` + // because errors lose class identity when serialized via JSON-RPC. + expect(new PeerRestartedError().name).toBe('PeerRestartedError'); + }); + + it('accepts a cause', () => { + const cause = new Error('Underlying handshake mismatch'); + const error = new PeerRestartedError({ cause }); + expect(error.cause).toBe(cause); + }); + + it('accepts a custom stack', () => { + const customStack = 'custom stack trace'; + const error = new PeerRestartedError({ stack: customStack }); + expect(error.stack).toBe(customStack); + }); + + it('unmarshals a valid marshaled PeerRestartedError', () => { + const marshaledError = { + [ErrorSentinel]: true, + message: expectedMessage, + stack: 'customStack', + code: ErrorCode.PeerRestartedError, + } as unknown as MarshaledOcapError; + + const unmarshaled = PeerRestartedError.unmarshal( + marshaledError, + unmarshalErrorOptions, + ); + expect(unmarshaled).toBeInstanceOf(PeerRestartedError); + expect(unmarshaled.code).toBe(ErrorCode.PeerRestartedError); + expect(unmarshaled.message).toBe(expectedMessage); + expect(unmarshaled.stack).toBe('customStack'); + }); + + it.each([ + { + name: 'invalid data field', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: ErrorCode.PeerRestartedError, + data: { unexpected: 'field' }, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: data -- Expected a value of type `never`, but received: `[object Object]`', + }, + { + name: 'wrong error code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + code: 'WRONG_ERROR_CODE' as ErrorCode, + stack: 'stack trace', + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"PEER_RESTARTED_ERROR"`, but received: "WRONG_ERROR_CODE"', + }, + { + name: 'missing code', + marshaledError: { + [ErrorSentinel]: true, + message: expectedMessage, + } as unknown as MarshaledOcapError, + expectedError: + 'At path: code -- Expected the literal `"PEER_RESTARTED_ERROR"`, but received: undefined', + }, + ])( + 'throws when unmarshaling with $name', + ({ marshaledError, expectedError }) => { + expect(() => + PeerRestartedError.unmarshal(marshaledError, unmarshalErrorOptions), + ).toThrow(expectedError); + }, + ); +}); diff --git a/packages/kernel-errors/src/errors/PeerRestartedError.ts b/packages/kernel-errors/src/errors/PeerRestartedError.ts new file mode 100644 index 0000000000..f8863c62f7 --- /dev/null +++ b/packages/kernel-errors/src/errors/PeerRestartedError.ts @@ -0,0 +1,63 @@ +import { + assert, + literal, + never, + object, + optional, +} from '@metamask/superstruct'; + +import { BaseError } from '../BaseError.ts'; +import { marshaledErrorSchema, ErrorCode } from '../constants.ts'; +import type { ErrorOptionsWithStack, MarshaledOcapError } from '../types.ts'; + +/** + * Sentinel error thrown by `sendRemoteMessage` when the outbound handshake + * detects the peer has restarted. The peer is reachable but its incarnation + * changed; the freshly dialed channel is closed without registration to + * keep stale payloads off the wire. Recipients use this to abort retransmit + * and reject pending traffic generated against the now-dead session. + */ +export class PeerRestartedError extends BaseError { + /** + * Creates a new PeerRestartedError. + * + * @param options - Additional error options including cause and stack. + * @param options.cause - The underlying error that caused the peer restart. + * @param options.stack - The stack trace of the error. + */ + constructor(options?: ErrorOptionsWithStack) { + super( + ErrorCode.PeerRestartedError, + 'Remote peer restarted: message not sent to avoid stale delivery', + { ...options }, + ); + harden(this); + } + + /** + * A superstruct struct for validating marshaled {@link PeerRestartedError} instances. + */ + public static struct = object({ + ...marshaledErrorSchema, + code: literal(ErrorCode.PeerRestartedError), + data: optional(never()), + }); + + /** + * Unmarshals a {@link MarshaledError} into a {@link PeerRestartedError}. + * + * @param marshaledError - The marshaled error to unmarshal. + * @param unmarshalErrorOptions - The function to unmarshal the error options. + * @returns The unmarshaled error. + */ + public static unmarshal( + marshaledError: MarshaledOcapError, + unmarshalErrorOptions: ( + marshaledError: MarshaledOcapError, + ) => ErrorOptionsWithStack, + ): PeerRestartedError { + assert(marshaledError, this.struct); + return new PeerRestartedError(unmarshalErrorOptions(marshaledError)); + } +} +harden(PeerRestartedError); diff --git a/packages/kernel-errors/src/errors/index.ts b/packages/kernel-errors/src/errors/index.ts index 235abe9c4b..7f12984744 100644 --- a/packages/kernel-errors/src/errors/index.ts +++ b/packages/kernel-errors/src/errors/index.ts @@ -1,6 +1,9 @@ import { AbortError } from './AbortError.ts'; import { DuplicateEndowmentError } from './DuplicateEndowmentError.ts'; import { EvaluatorError } from './EvaluatorError.ts'; +import { IntentionalCloseError } from './IntentionalCloseError.ts'; +import { NetworkStoppedError } from './NetworkStoppedError.ts'; +import { PeerRestartedError } from './PeerRestartedError.ts'; import { ResourceLimitError } from './ResourceLimitError.ts'; import { SampleGenerationError } from './SampleGenerationError.ts'; import { StreamReadError } from './StreamReadError.ts'; @@ -21,4 +24,7 @@ export const errorClasses = { [ErrorCode.SampleGenerationError]: SampleGenerationError, [ErrorCode.InternalError]: EvaluatorError, [ErrorCode.ResourceLimitError]: ResourceLimitError, + [ErrorCode.PeerRestartedError]: PeerRestartedError, + [ErrorCode.IntentionalCloseError]: IntentionalCloseError, + [ErrorCode.NetworkStoppedError]: NetworkStoppedError, } as const; diff --git a/packages/kernel-errors/src/index.test.ts b/packages/kernel-errors/src/index.test.ts index 6ffe117a78..3be9829cb4 100644 --- a/packages/kernel-errors/src/index.test.ts +++ b/packages/kernel-errors/src/index.test.ts @@ -11,9 +11,12 @@ describe('index', () => { 'ErrorSentinel', 'ErrorStruct', 'EvaluatorError', + 'IntentionalCloseError', 'KERNEL_ERROR_PATTERN', 'MarshaledErrorStruct', 'MarshaledOcapErrorStruct', + 'NetworkStoppedError', + 'PeerRestartedError', 'ResourceLimitError', 'SampleGenerationError', 'StreamReadError', @@ -30,6 +33,7 @@ describe('index', () => { 'isOcapError', 'isResourceLimitError', 'isRetryableNetworkError', + 'isTerminalSendError', 'marshalError', 'toError', 'unmarshalError', diff --git a/packages/kernel-errors/src/index.ts b/packages/kernel-errors/src/index.ts index c0011e17da..efee68052c 100644 --- a/packages/kernel-errors/src/index.ts +++ b/packages/kernel-errors/src/index.ts @@ -8,6 +8,9 @@ export { VatNotFoundError } from './errors/VatNotFoundError.ts'; export { StreamReadError } from './errors/StreamReadError.ts'; export { SubclusterNotFoundError } from './errors/SubclusterNotFoundError.ts'; export { AbortError } from './errors/AbortError.ts'; +export { IntentionalCloseError } from './errors/IntentionalCloseError.ts'; +export { NetworkStoppedError } from './errors/NetworkStoppedError.ts'; +export { PeerRestartedError } from './errors/PeerRestartedError.ts'; export { ResourceLimitError, type ResourceLimitType, @@ -27,6 +30,7 @@ export { unmarshalError } from './marshal/unmarshalError.ts'; export { isMarshaledError } from './marshal/isMarshaledError.ts'; export { isMarshaledOcapError } from './marshal/isMarshaledOcapError.ts'; export { isRetryableNetworkError } from './utils/isRetryableNetworkError.ts'; +export { isTerminalSendError } from './utils/isTerminalSendError.ts'; export { getNetworkErrorCode } from './utils/getNetworkErrorCode.ts'; export { isResourceLimitError } from './utils/isResourceLimitError.ts'; export type { diff --git a/packages/kernel-errors/src/utils/isTerminalSendError.test.ts b/packages/kernel-errors/src/utils/isTerminalSendError.test.ts new file mode 100644 index 0000000000..bf317e0c78 --- /dev/null +++ b/packages/kernel-errors/src/utils/isTerminalSendError.test.ts @@ -0,0 +1,59 @@ +import { describe, it, expect } from 'vitest'; + +import { isTerminalSendError } from './isTerminalSendError.ts'; +import { AbortError } from '../errors/AbortError.ts'; +import { IntentionalCloseError } from '../errors/IntentionalCloseError.ts'; +import { NetworkStoppedError } from '../errors/NetworkStoppedError.ts'; +import { PeerRestartedError } from '../errors/PeerRestartedError.ts'; + +describe('isTerminalSendError', () => { + describe('returns true for terminal sentinel errors', () => { + it.each([ + ['PeerRestartedError instance', new PeerRestartedError()], + ['IntentionalCloseError instance', new IntentionalCloseError()], + ['NetworkStoppedError instance', new NetworkStoppedError()], + ])('matches a fresh %s', (_label, error) => { + expect(isTerminalSendError(error)).toBe(true); + }); + + it.each([ + ['PeerRestartedError', 'PeerRestartedError'], + ['IntentionalCloseError', 'IntentionalCloseError'], + ['NetworkStoppedError', 'NetworkStoppedError'], + ])( + 'matches a plain Error whose name was renamed to %s (RPC-boundary shape)', + (_label, name) => { + // Errors that cross the platform-services RPC boundary lose class + // identity but preserve the `name` field; the predicate must still + // match in that case. + const reconstituted = Object.assign(new Error('reconstituted'), { + name, + }); + expect(isTerminalSendError(reconstituted)).toBe(true); + }, + ); + }); + + describe('returns false for non-terminal values', () => { + it.each([ + ['plain Error', new Error('transient network glitch')], + ['AbortError (not a send-side terminal)', new AbortError()], + [ + 'Error with unrelated name', + Object.assign(new Error('unrelated'), { name: 'CustomError' }), + ], + ])('rejects %s', (_label, error) => { + expect(isTerminalSendError(error)).toBe(false); + }); + + it.each([ + ['undefined', undefined], + ['null', null], + ['string rejection', 'PeerRestartedError'], + ['plain object with matching name', { name: 'PeerRestartedError' }], + ['number', 42], + ])('rejects non-Error %s', (_label, value) => { + expect(isTerminalSendError(value)).toBe(false); + }); + }); +}); diff --git a/packages/kernel-errors/src/utils/isTerminalSendError.ts b/packages/kernel-errors/src/utils/isTerminalSendError.ts new file mode 100644 index 0000000000..cab4c4bf6c --- /dev/null +++ b/packages/kernel-errors/src/utils/isTerminalSendError.ts @@ -0,0 +1,30 @@ +import { IntentionalCloseError } from '../errors/IntentionalCloseError.ts'; +import { NetworkStoppedError } from '../errors/NetworkStoppedError.ts'; +import { PeerRestartedError } from '../errors/PeerRestartedError.ts'; + +/** + * Names of the sentinel errors that mean retransmit/retry should abort — + * derived from the classes themselves so adding a new terminal class without + * registering it would be an obvious omission rather than a silent drift. + * + * Detection uses `name` (not `instanceof`) because errors cross the + * platform-services RPC boundary as serialized JSON-RPC error envelopes + * that don't preserve class identity. The `name` field is preserved. + */ +const TERMINAL_NAMES: ReadonlySet = new Set([ + PeerRestartedError.name, + IntentionalCloseError.name, + NetworkStoppedError.name, +]); + +/** + * Whether a thrown send-side error is a terminal verdict from the transport + * (peer restart, intentional close, network stopped). Recipients should + * abort retransmit and reject pending state instead of retrying. + * + * @param error - The error thrown by `sendRemoteMessage`. + * @returns True if the error is a terminal verdict. + */ +export function isTerminalSendError(error: unknown): boolean { + return error instanceof Error && TERMINAL_NAMES.has(error.name); +} diff --git a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts index ab19d89942..3a7b81ed5d 100644 --- a/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts +++ b/packages/kernel-node-runtime/test/e2e/remote-comms.test.ts @@ -26,6 +26,7 @@ import { makeMaasClientConfig, makeMaasServerConfig, makeRemoteVatConfig, + restartKernel, restartKernelAndReloadVat, sendRemoteMessage, setupAliceAndBob, @@ -1016,6 +1017,107 @@ describe.sequential('Remote Communications E2E', () => { }, NETWORK_TIMEOUT * 3, ); + + it( + 'preserves seq dedup when receiver loses peer-incarnation memory', + async () => { + // Reproduces issue #944. A peer's incarnationId is held only in the + // receiver's in-memory PeerStateManager, but RemoteHandle's + // #highestReceivedSeq is persisted in KV. When PSM loses the entry + // (e.g. receiver restart, or stale-peer cleanup) and the sender + // comes back with the same peerId but a fresh incarnation, the + // handshake mis-classifies the reconnect as a "first connection" + // (previousIncarnation undefined → setRemoteIncarnation returns + // false → handlePeerRestart is never called) and the sender's + // fresh seq=1 messages are silently dropped against the stale + // persisted dedup state. + // + // Fix the sender's libp2p key with a mnemonic so that wiping its + // storage gives a fresh incarnation and seq counter without also + // changing the peerId — that's the precondition that exposes the + // dedup-vs-incarnation lifetime mismatch. + const k2Mnemonic = + 'legal winner thank year wave sausage worth useful legal winner thank yellow'; + // ackTimeoutMs gates the URL redemption timeout + // (`ackTimeoutMs * (MAX_RETRIES + 1)`). Keep it generous enough to + // absorb the cost of fresh K2's libp2p dial + handshake under CI + // load, otherwise this test becomes flaky on slower runners. + const opts = { + relays: testRelays, + reconnectionBaseDelayMs: 100, + reconnectionMaxDelayMs: 500, + handshakeTimeoutMs: 5_000, + writeTimeoutMs: 5_000, + ackTimeoutMs: 2_000, + maxRetryAttempts: 4, + }; + + await kernel1.initRemoteComms({ ...opts }); + await kernel2.initRemoteComms({ ...opts, mnemonic: k2Mnemonic }); + const aliceURL = await launchVatAndGetURL( + kernel1, + makeRemoteVatConfig('Alice'), + ); + await launchVatAndGetURL(kernel2, makeRemoteVatConfig('Bob')); + const bobRef = getVatRootRef(kernel2, kernelStore2, 'Bob'); + + // Bob (K2) sends to Alice (K1) once, advancing K1's persisted + // #highestReceivedSeq for K2's peer past 0. + const phase1 = await sendRemoteMessage( + kernel2, + bobRef, + aliceURL, + 'hello', + ['Bob-before'], + ); + expect(phase1).toContain('vat Alice got "hello" from Bob-before'); + + // Allow K2's standalone ACK to settle before stopping K1, otherwise + // K1 would persist a stale pending notify and retransmit it on + // restart — a separate failure mode unrelated to seq dedup. The + // delayed-ACK timer fires at 50ms. + await delay(150); + + // Restart the receiver keeping its DB. PeerStateManager rebuilds + // empty; RemoteHandles are restored from KV with their seq state. + await stopWithTimeout(async () => kernel1.stop(), 3000, 'kernel1.stop'); + // eslint-disable-next-line require-atomic-updates + kernel1 = await restartKernel(dbFilename1, false, testRelays, opts); + + // Restart the sender with a FRESH DB but the same mnemonic — same + // peerId, but fresh incarnationId and a seq counter starting from 0. + await stopWithTimeout(async () => kernel2.stop(), 3000, 'kernel2.stop'); + const freshDb2 = await makeSQLKernelDatabase({ + dbFilename: join(tempDir, 'kernel2-fresh.db'), + }); + // eslint-disable-next-line require-atomic-updates + kernelStore2 = makeKernelStore(freshDb2); + // eslint-disable-next-line require-atomic-updates + kernel2 = await makeTestKernel(freshDb2); + await kernel2.initRemoteComms({ ...opts, mnemonic: k2Mnemonic }); + await launchVatAndGetURL(kernel2, makeRemoteVatConfig('Bob')); + const newBobRef = getVatRootRef(kernel2, kernelStore2, 'Bob'); + + // Fresh K2 sends seq=1 to the same alice URL. K1's PSM has no entry + // for K2's peer, so setRemoteIncarnation returns false, no reset + // happens, and K1's persisted #highestReceivedSeq >= 1 silently + // drops the message. Without the fix this send fails after the URL + // redemption budget (ackTimeoutMs × (MAX_RETRIES + 1), where + // MAX_RETRIES is hardcoded to 3 in RemoteHandle.ts) — unrelated to + // the libp2p `maxRetryAttempts` setting. + const phase4 = await sendRemoteMessage( + kernel2, + newBobRef, + aliceURL, + 'hello', + ['Bob-after-restart'], + ); + expect(phase4).toContain( + 'vat Alice got "hello" from Bob-after-restart', + ); + }, + NETWORK_TIMEOUT * 3, + ); }); describe('Promise Rejection on Remote Give-Up', () => { diff --git a/packages/ocap-kernel/CHANGELOG.md b/packages/ocap-kernel/CHANGELOG.md index b9bbe4d428..c1e67d864b 100644 --- a/packages/ocap-kernel/CHANGELOG.md +++ b/packages/ocap-kernel/CHANGELOG.md @@ -33,6 +33,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Deserialize CapData rejections in `Kernel.queueMessage` so vat errors surface as plain `Error` objects to all callers ([#928](https://github.com/MetaMask/ocap-kernel/pull/928)) +- Detect peer restart across receiver state loss so the receiving kernel no longer silently drops a restarted peer's `seq=1` messages ([#948](https://github.com/MetaMask/ocap-kernel/pull/948)) + - Persist the peer's last-observed incarnation and compare it on every successful handshake; on a detected restart, clear the peer's c-list contributions and reject the promises it was deciding before the new incarnation reuses any erefs ## [0.7.0] diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts index d815edfe58..e114834029 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.test.ts @@ -1508,4 +1508,245 @@ describe('RemoteHandle', () => { ); }); }); + + describe('ack timeout retransmit', () => { + // SES lockdown freezes Date, preventing vi.useFakeTimers(); spy on + // setTimeout instead and invoke captured callbacks directly. + type PendingTimer = { + callback: () => void; + delay: number; + }; + let pendingTimers: PendingTimer[]; + let setTimeoutSpy: ReturnType; + let clearTimeoutSpy: ReturnType; + + beforeEach(() => { + pendingTimers = []; + setTimeoutSpy = vi.spyOn(globalThis, 'setTimeout').mockImplementation((( + callback: () => void, + delay: number, + ) => { + pendingTimers.push({ callback, delay }); + return pendingTimers.length as unknown as ReturnType; + }) as typeof setTimeout); + clearTimeoutSpy = vi + .spyOn(globalThis, 'clearTimeout') + .mockImplementation((handle: unknown) => { + if (typeof handle === 'number') { + pendingTimers[handle - 1] = { + callback: () => undefined, + delay: 0, + }; + } + }); + }); + + afterEach(() => { + setTimeoutSpy.mockRestore(); + clearTimeoutSpy.mockRestore(); + }); + + /** + * Trigger the most recent ACK timer (skipping any cleared/no-op slots). + */ + function fireLastAckTimer(): void { + for (let i = pendingTimers.length - 1; i >= 0; i -= 1) { + const timer = pendingTimers[i]; + if (timer && timer.delay > 0) { + timer.callback(); + return; + } + } + } + + it('sends pending messages sequentially, awaiting each before the next', async () => { + const remote = RemoteHandle.make({ + remoteId: mockRemoteId, + peerId: mockRemotePeerId, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + remoteComms: mockRemoteComms, + ackTimeoutMs: 100, + }); + + await remote.deliverNotify([ + ['rp+1', false, { body: '"first"', slots: [] }], + ]); + await remote.deliverNotify([ + ['rp+2', false, { body: '"second"', slots: [] }], + ]); + + // send 1 stays pending until we resolve it. + const sendCalls: string[] = []; + let resolveFirst!: () => void; + const firstPromise = new Promise((resolve) => { + resolveFirst = resolve; + }); + vi.mocked(mockRemoteComms.sendRemoteMessage).mockReset(); + vi.mocked(mockRemoteComms.sendRemoteMessage).mockImplementation( + async (_peer, payload) => { + sendCalls.push(payload); + if (sendCalls.length === 1) { + await firstPromise; + } + return undefined; + }, + ); + + fireLastAckTimer(); + // Yield to microtasks so the first iteration's await sees the pending + // promise but hasn't yet moved on. + await Promise.resolve(); + await Promise.resolve(); + expect(sendCalls).toHaveLength(1); + + resolveFirst(); + // Drain microtasks so the second iteration completes. + for (let i = 0; i < 5; i += 1) { + await Promise.resolve(); + } + expect(sendCalls).toHaveLength(2); + }); + + it('aborts retransmit and fires give-up when sendRemoteMessage signals peer restart', async () => { + const onGiveUp = vi.fn(); + const remote = RemoteHandle.make({ + remoteId: mockRemoteId, + peerId: mockRemotePeerId, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + remoteComms: mockRemoteComms, + ackTimeoutMs: 100, + onGiveUp, + }); + + await remote.deliverNotify([['rp+1', false, { body: '"a"', slots: [] }]]); + await remote.deliverNotify([['rp+2', false, { body: '"b"', slots: [] }]]); + + // Synthesize a PeerRestartedError-shaped rejection (the real class is + // transport-internal; isTerminalSendError matches by `error.name`). + const peerRestarted = Object.assign( + new Error('Remote peer restarted: message not sent'), + { name: 'PeerRestartedError' }, + ); + vi.mocked(mockRemoteComms.sendRemoteMessage).mockReset(); + vi.mocked(mockRemoteComms.sendRemoteMessage).mockRejectedValue( + peerRestarted, + ); + + fireLastAckTimer(); + for (let i = 0; i < 5; i += 1) { + await Promise.resolve(); + } + + // Only iteration 1 runs before the terminal error short-circuits. + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledTimes(1); + expect(onGiveUp).toHaveBeenCalledWith(mockRemotePeerId); + }); + + it('logs and continues on transient send errors', async () => { + const onGiveUp = vi.fn(); + const remote = RemoteHandle.make({ + remoteId: mockRemoteId, + peerId: mockRemotePeerId, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + remoteComms: mockRemoteComms, + ackTimeoutMs: 100, + onGiveUp, + }); + + await remote.deliverNotify([['rp+1', false, { body: '"a"', slots: [] }]]); + await remote.deliverNotify([['rp+2', false, { body: '"b"', slots: [] }]]); + + vi.mocked(mockRemoteComms.sendRemoteMessage).mockReset(); + vi.mocked(mockRemoteComms.sendRemoteMessage) + .mockRejectedValueOnce(new Error('temporary network glitch')) + .mockResolvedValueOnce(undefined); + + fireLastAckTimer(); + for (let i = 0; i < 5; i += 1) { + await Promise.resolve(); + } + + // Both iterations ran — the transient failure didn't abort. + expect(mockRemoteComms.sendRemoteMessage).toHaveBeenCalledTimes(2); + expect(onGiveUp).not.toHaveBeenCalled(); + }); + }); + + describe('first-send terminal errors', () => { + it.each([ + [ + 'PeerRestartedError', + Object.assign(new Error('peer restarted'), { + name: 'PeerRestartedError', + }), + ], + [ + 'IntentionalCloseError', + Object.assign(new Error('intentional close'), { + name: 'IntentionalCloseError', + }), + ], + [ + 'NetworkStoppedError', + Object.assign(new Error('Network stopped'), { + name: 'NetworkStoppedError', + }), + ], + ])( + 'rejects pending and fires onGiveUp when initial send rejects with %s', + async (_name, terminalError) => { + const onGiveUp = vi.fn(); + const remote = RemoteHandle.make({ + remoteId: mockRemoteId, + peerId: mockRemotePeerId, + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + remoteComms: mockRemoteComms, + ackTimeoutMs: 100, + onGiveUp, + }); + + vi.mocked(mockRemoteComms.sendRemoteMessage).mockRejectedValueOnce( + terminalError, + ); + + const redeem = remote.redeemOcapURL('ocap:something@peer,relay'); + // Drain microtasks so the catch handler runs. + for (let i = 0; i < 5; i += 1) { + await Promise.resolve(); + } + + // The redemption rejects with the giveUp/rejectAllPending reason, + // which is the terminal error's message string. + await expect(redeem).rejects.toThrow(terminalError.message); + expect(onGiveUp).toHaveBeenCalledWith(mockRemotePeerId); + }, + ); + }); + + describe('handlePeerRestart c-list teardown', () => { + it('clears the peer’s "+"-direction c-list entries via forgetEndpointImports', async () => { + const remote = makeRemote(); + + // Seed an object export from the peer (peer-allocated eref ro+5 + // mapped to a fresh kernel object). + const eref = 'ro+5'; + const kref = mockKernelStore.exportFromEndpoint(mockRemoteId, eref); + + // Sanity: c-list is populated in both directions before restart. + // Use the raw `krefToEref`/`erefToKref` lookups (not the translating + // wrappers, which flip RRef polarity for receiver-frame interpretation). + expect(mockKernelStore.erefToKref(mockRemoteId, eref)).toBe(kref); + expect(mockKernelStore.krefToEref(mockRemoteId, kref)).toBe(eref); + + remote.handlePeerRestart(); + + // Both halves of the c-list pair are gone after restart. + expect(mockKernelStore.erefToKref(mockRemoteId, eref)).toBeUndefined(); + expect(mockKernelStore.krefToEref(mockRemoteId, kref)).toBeUndefined(); + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts index 639c31a4b4..9a619789d6 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts @@ -1,6 +1,7 @@ import type { VatOneResolution } from '@agoric/swingset-liveslots'; import type { CapData } from '@endo/marshal'; import { makePromiseKit } from '@endo/promise-kit'; +import { isTerminalSendError } from '@metamask/kernel-errors'; import { Logger } from '@metamask/logger'; import { @@ -405,36 +406,80 @@ export class RemoteHandle implements EndpointHandle { this.#logger.log( `${this.#peerId.slice(0, 8)}:: retransmitting ${this.#getPendingCount()} pending messages (attempt ${this.#retryCount + 1})`, ); - this.#retransmitPending(); + this.#retransmitPending().catch((error) => { + // Terminal errors propagate up to here; the loop already aborted, so + // we don't re-arm the timer (no point retrying against a permanently + // unreachable peer). Pending messages have been rejected and the + // give-up callback fired by the inner handler. + this.#logger.error( + `${this.#peerId.slice(0, 8)}:: retransmission aborted:`, + error, + ); + }); } /** * Retransmit all pending messages. + * + * Sends sequentially so a peer-restart detection during the first send can + * short-circuit the rest: clearRemoteSeqState (called by persistPeerRestart) + * deletes both the seq counters and the `remotePending.*` payloads, so + * `getPendingMessage` returns undefined for subsequent iterations and the + * loop bound (`seq <= this.#nextSendSeq`, now 0) terminates immediately. + * + * Sending in parallel is unsafe: once the first send registers the channel + * post-restart, parallel iterations would see `state.channel` set and + * bypass the outbound handshake's stale-delivery guard, writing + * pre-restart payloads on the same channel. + * + * Terminal errors (intentional close, network stopped, peer-restart-detected + * throw) abort the loop instead of being logged-and-skipped: continuing + * would just rebuild the queue against a known-unreachable peer and + * produce identical failures on every retry until MAX_RETRIES is hit. */ - #retransmitPending(): void { + async #retransmitPending(): Promise { for (let seq = this.#startSeq; seq <= this.#nextSendSeq; seq += 1) { const messageString = this.#kernelStore.getPendingMessage( this.remoteId, seq, ); - if (messageString) { - this.#remoteComms - .sendRemoteMessage(this.#peerId, messageString) - .catch((error) => { - this.#logger.error('Error retransmitting message:', error); - }); + if (!messageString) { + continue; + } + try { + await this.#remoteComms.sendRemoteMessage(this.#peerId, messageString); + } catch (error) { + if (isTerminalSendError(error)) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: aborting retransmit (${(error as Error).message})`, + ); + this.#clearAckTimeout(); + this.#rejectAllPending((error as Error).message); + this.rejectPendingRedemptions((error as Error).message); + this.#onGiveUp?.(this.#peerId); + throw error; + } + this.#logger.error( + `${this.#peerId.slice(0, 8)}:: error retransmitting seq=${seq}:`, + error, + ); } } this.#startAckTimeout(); } /** - * Discard all pending messages due to delivery failure. + * Discard all pending messages due to delivery failure. Safe no-op when + * the queue is already empty — guards against clobbering kv state that a + * prior cleanup (e.g. {@link persistPeerRestart}) already cleared. * * @param reason - The reason for failure. */ #rejectAllPending(reason: string): void { const pendingCount = this.#getPendingCount(); + if (pendingCount === 0) { + return; + } for (let i = 0; i < pendingCount; i += 1) { this.#logger.warn( `Message ${this.#startSeq + i} delivery failed: ${reason}`, @@ -569,25 +614,34 @@ export class RemoteHandle implements EndpointHandle { this.#startAckTimeout(); } - // Send the message (non-blocking - don't wait for ACK) + // Send the message (non-blocking - don't wait for ACK). + // + // Terminal verdicts from the transport (peer restarted, intentional + // close, network stopped) mean the message we just persisted will + // never be delivered as-is: reject all pending now and signal give-up + // rather than letting the message linger until ACK timeout × MAX_RETRIES. + // + // For PeerRestartedError specifically, the kernel-side + // `onIncarnationChange` callback ran `persistPeerRestart` and + // `finalizePeerRestart` synchronously *before* the transport threw, so + // most of the cleanup below is already a no-op. The guards inside + // `#rejectAllPending` and `rejectPendingRedemptions` keep this safe; + // calling `#onGiveUp` again exercises an idempotent path. this.#remoteComms .sendRemoteMessage(this.#peerId, messageString) .catch((error) => { - // Handle intentional close errors specially - reject pending redemptions - if ( - error instanceof Error && - error.message.includes('intentional close') - ) { + if (isTerminalSendError(error)) { + const reason = (error as Error).message; this.#clearAckTimeout(); - this.#rejectAllPending('intentional close'); - this.rejectPendingRedemptions( - 'Message delivery failed after intentional close', - ); - // Notify RemoteManager to reject kernel promises for this remote + this.#rejectAllPending(reason); + this.rejectPendingRedemptions(reason); this.#onGiveUp?.(this.#peerId); return; } - this.#logger.error('Error sending remote message:', error); + this.#logger.error( + `${this.#peerId.slice(0, 8)}:: error sending remote message seq=${seq}:`, + error, + ); }); } @@ -1098,39 +1152,59 @@ export class RemoteHandle implements EndpointHandle { } /** - * Handle a peer restart (incarnation change). - * Resets all state for a fresh start: clears timers, rejects pending messages - * and redemptions, resets sequence numbers, and clears persisted seq state. - * Called when the handshake detects that the remote peer has restarted. + * Persist the peer-restart side effects (kv-only). Reversible by the + * caller's savepoint: if `forgetEndpointImports` throws (e.g. corrupt + * c-list entry, refcount underflow) and the caller rolls back, no + * in-memory state has been disturbed. + * + * Pairs with {@link finalizePeerRestart}, which the caller MUST invoke + * exactly once after `releaseSavepoint` succeeds. Splitting the work + * keeps non-reversible mutations (timers, rejected redemption promises, + * in-memory seq counters) out of the savepoint window — those would + * leave the in-memory view inconsistent with the persisted view if the + * kv layer rolled back. */ - handlePeerRestart(): void { + persistPeerRestart(): void { this.#logger.log( `${this.#peerId.slice(0, 8)}:: handling peer restart, resetting state`, ); + this.#kernelStore.clearRemoteSeqState(this.remoteId); + this.#kernelStore.forgetEndpointImports(this.remoteId); + } - // Clear timers - this.#clearAckTimeout(); - this.#clearDelayedAck(); + /** + * Convenience wrapper that runs the persisted and in-memory phases + * back-to-back. Use only outside a savepoint window — transactional + * callers (e.g. {@link RemoteManager.handleIncarnationChange}) must call + * {@link persistPeerRestart} inside the savepoint and + * {@link finalizePeerRestart} after release, so a kv rollback can't + * leave the in-memory view inconsistent with the persisted view. + */ + handlePeerRestart(): void { + this.persistPeerRestart(); + this.finalizePeerRestart(); + } - // Reject all pending messages - they will never be ACKed by the restarted peer + /** + * Apply the in-memory side of a peer restart: cancel timers, reject + * in-flight URL redemption promises, and reset sequence counters. Must + * be called after {@link persistPeerRestart} and after the caller's + * savepoint has been released. + */ + finalizePeerRestart(): void { + const pendingCount = this.#getPendingCount(); if (this.#hasPendingMessages()) { this.#logger.log( - `${this.#peerId.slice(0, 8)}:: rejecting ${this.#getPendingCount()} pending messages due to peer restart`, + `${this.#peerId.slice(0, 8)}:: discarding ${pendingCount} pending messages due to peer restart`, ); - this.#rejectAllPending('Remote peer restarted'); } - - // Reject pending URL redemptions - the remote won't have context for them + this.#clearAckTimeout(); + this.#clearDelayedAck(); this.rejectPendingRedemptions('Remote peer restarted'); - - // Reset sequence numbers and flags for fresh start this.#nextSendSeq = 0; this.#highestReceivedSeq = 0; this.#startSeq = 0; this.#retryCount = 0; this.#remoteGcRequested = false; - - // Clear persisted sequence state - this.#kernelStore.clearRemoteSeqState(this.remoteId); } } diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts index 7eed72d908..34e9e91502 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.test.ts @@ -814,38 +814,81 @@ describe('RemoteManager', () => { await remoteManager.initRemoteComms(); }); - it('calls handlePeerRestart on remote when incarnation changes', () => { + /** + * Get the onIncarnationChange callback (9th argument, index 8) that + * RemoteManager passed when initializing remote comms. + * + * @returns The onIncarnationChange callback bound to the RemoteManager. + */ + function getOnIncarnationChange(): ( + peerId: string, + observedIncarnation: string, + ) => Promise { + const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; + return initCall?.[8] as ( + peerId: string, + observedIncarnation: string, + ) => Promise; + } + + it('triggers persist + finalize peer restart when persisted incarnation differs from observed', async () => { const peerId = 'peer-that-restarted'; const remote = remoteManager.establishRemote(peerId); - const handlePeerRestartSpy = vi.spyOn(remote, 'handlePeerRestart'); - // Get the onIncarnationChange callback (9th argument, index 8) - const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; - const onIncarnationChange = initCall?.[8] as (peerId: string) => void; - onIncarnationChange(peerId); - expect(handlePeerRestartSpy).toHaveBeenCalled(); + const persistSpy = vi.spyOn(remote, 'persistPeerRestart'); + const finalizeSpy = vi.spyOn(remote, 'finalizePeerRestart'); + // Seed the persisted incarnation (as if a prior handshake recorded it). + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); + + const verdict = await getOnIncarnationChange()(peerId, 'incarnation-B'); + + expect(verdict).toBe(true); + expect(persistSpy).toHaveBeenCalled(); + expect(finalizeSpy).toHaveBeenCalled(); + expect(kernelStore.getPeerIncarnation(peerId)).toBe('incarnation-B'); }); - it('rejects kernel promises where remote is decider', () => { + it('does not trigger restart on first observation of a peer', async () => { + const peerId = 'peer-first-contact'; + const remote = remoteManager.establishRemote(peerId); + const persistSpy = vi.spyOn(remote, 'persistPeerRestart'); + const finalizeSpy = vi.spyOn(remote, 'finalizePeerRestart'); + + const verdict = await getOnIncarnationChange()(peerId, 'incarnation-A'); + + expect(verdict).toBe(false); + expect(persistSpy).not.toHaveBeenCalled(); + expect(finalizeSpy).not.toHaveBeenCalled(); + expect(kernelStore.getPeerIncarnation(peerId)).toBe('incarnation-A'); + }); + + it('does nothing when observed incarnation matches persisted value', async () => { + const peerId = 'peer-stable'; + const remote = remoteManager.establishRemote(peerId); + const persistSpy = vi.spyOn(remote, 'persistPeerRestart'); + const finalizeSpy = vi.spyOn(remote, 'finalizePeerRestart'); + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); + + const verdict = await getOnIncarnationChange()(peerId, 'incarnation-A'); + + expect(verdict).toBe(false); + expect(persistSpy).not.toHaveBeenCalled(); + expect(finalizeSpy).not.toHaveBeenCalled(); + }); + + it('rejects kernel promises where the restarted remote is decider', async () => { const peerId = 'peer-with-promises'; const remote = remoteManager.establishRemote(peerId); const { remoteId } = remote; - // Set up a promise where the remote is the decider const [kpid] = kernelStore.initKernelPromise(); kernelStore.setPromiseDecider(kpid, remoteId); - - // Set up the cle. key that getPromisesByDecider looks for - // The key format is cle.{decider}.{eref} = kpid kernelKVStore.set(`cle.${remoteId}.p+1`, kpid); + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); - // Trigger incarnation change - const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; - const onIncarnationChange = initCall?.[8] as (peerId: string) => void; - onIncarnationChange(peerId); + await getOnIncarnationChange()(peerId, 'incarnation-B'); - // Should reject the promise with PEER_RESTARTED error expect(resolvePromisesSpy).toHaveBeenCalledWith(remoteId, [ [ kpid, @@ -857,21 +900,56 @@ describe('RemoteManager', () => { ]); }); - it('does nothing when remote does not exist', () => { - const peerId = 'non-existent-peer'; - const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; - const onIncarnationChange = initCall?.[8] as (peerId: string) => void; - expect(() => onIncarnationChange(peerId)).not.toThrow(); + it('persists the incarnation and reports restart even when no remote handle exists', async () => { + const peerId = 'unknown-peer'; + const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); + + // The verdict reflects the persisted-state comparison, not whether + // there's a RemoteHandle to reset. Transport callers use the verdict + // to suppress stale outbound messages; that decision is correct + // regardless of local handle presence. + const verdict = await getOnIncarnationChange()(peerId, 'incarnation-B'); + expect(verdict).toBe(true); + + expect(kernelStore.getPeerIncarnation(peerId)).toBe('incarnation-B'); + // No remote handle → no promises to reject. + expect(resolvePromisesSpy).not.toHaveBeenCalled(); }); - it('does not reject promises when there are none', () => { + it('does not reject promises when there are none', async () => { const peerId = 'peer-without-promises'; remoteManager.establishRemote(peerId); + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); const resolvePromisesSpy = vi.spyOn(mockKernelQueue, 'resolvePromises'); - const initCall = vi.mocked(remoteComms.initRemoteComms).mock.calls[0]; - const onIncarnationChange = initCall?.[8] as (peerId: string) => void; - onIncarnationChange(peerId); + + await getOnIncarnationChange()(peerId, 'incarnation-B'); + expect(resolvePromisesSpy).not.toHaveBeenCalled(); }); + + it('rolls back the savepoint and preserves stored state when persistPeerRestart throws', async () => { + const peerId = 'peer-handler-throws'; + const remote = remoteManager.establishRemote(peerId); + kernelStore.setPeerIncarnation(peerId, 'incarnation-A'); + + const failure = new Error('synthetic persistPeerRestart failure'); + vi.spyOn(remote, 'persistPeerRestart').mockImplementation(() => { + throw failure; + }); + const finalizeSpy = vi.spyOn(remote, 'finalizePeerRestart'); + + await expect( + getOnIncarnationChange()(peerId, 'incarnation-B'), + ).rejects.toThrow(failure); + + // Persisted incarnation must NOT have advanced — the savepoint + // rollback should have reverted the would-be setPeerIncarnation that + // runs after persistPeerRestart in the wrapped block. + expect(kernelStore.getPeerIncarnation(peerId)).toBe('incarnation-A'); + // finalize must not run if the persisted phase failed: in-memory + // mutations would otherwise drift from the rolled-back kv view. + expect(finalizeSpy).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts index c7681b2df2..d9b7c3c946 100644 --- a/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts +++ b/packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts @@ -190,38 +190,100 @@ export class RemoteManager { } /** - * Handle when a remote peer's incarnation changes (peer restarted). - * Resets the RemoteHandle state and rejects kernel promises for a fresh start. + * Handle a peer's reported incarnation after a successful handshake. * - * @param peerId - The peer ID of the remote that restarted. + * Compares the observed incarnation against the value persisted in the + * kernel store. When they differ AND a previous value was on file, the peer + * has truly restarted: persist the new incarnation and reset the + * RemoteHandle's seq dedup state, atomically with respect to the kv layer + * so a crash mid-reset can't leave us with the new dedup state under the + * old recorded incarnation. + * + * The savepoint guards only kv-layer writes (`setPeerIncarnation` plus + * everything `handlePeerRestart` persists via `clearRemoteSeqState` and + * `forgetEndpointImports`). Run-queue mutations (`resolvePromises`) and + * the in-memory counter resets inside `RemoteHandle.handlePeerRestart` + * are NOT reversible by a savepoint, so we collect the work to do inside + * the savepoint, commit, and then fan it out — mirroring the + * deferred-completion pattern in `RemoteHandle.handleRemoteMessage`. + * + * Fires on every handshake (not only on detected change) because the + * in-memory PeerStateManager is unreliable across receiver restart and + * stale-peer cleanup; the persisted value is the authoritative anchor for + * detecting peer restart. + * + * @param peerId - The peer that completed the handshake. + * @param observedIncarnation - The incarnationId the peer just reported. + * @returns Whether the peer was determined to have restarted (a defined + * prior value differed from the observed one). The transport uses this + * to suppress stale outbound messages on the same connection. */ - #handleIncarnationChange(peerId: string): void { - const remote = this.#remotesByPeer.get(peerId); - if (!remote) { - // Remote not found - might not have been established yet - this.#logger?.log( - `Incarnation change for unknown peer ${peerId.slice(0, 8)}, ignoring`, - ); - return; + async #handleIncarnationChange( + peerId: string, + observedIncarnation: string, + ): Promise { + const stored = this.#kernelStore.getPeerIncarnation(peerId); + if (stored === observedIncarnation) { + return false; } - this.#logger?.log( - `Handling incarnation change for peer ${peerId.slice(0, 8)}`, - ); - - // Reset RemoteHandle state (pending messages, redemptions, seq numbers) - remote.handlePeerRestart(); + const isRestart = stored !== undefined; + const remote = isRestart ? this.#remotesByPeer.get(peerId) : undefined; + + // Snapshot the decider list BEFORE any kv mutation so the c-list lookup + // can still find the promises through the entries forgetEndpointImports + // is about to tear down. We materialize into an array because the + // generator iterates over kv state that we'll mutate. + const promisesToReject = remote + ? Array.from(this.#kernelStore.getPromisesByDecider(remote.remoteId)) + : []; + + const savepoint = `peerIncarnation_${peerId}`; + this.#kernelStore.createSavepoint(savepoint); + try { + if (isRestart) { + this.#logger?.log( + `Peer ${peerId.slice(0, 8)} restarted (incarnation ${stored.slice(0, 8)} → ${observedIncarnation.slice(0, 8)})`, + ); + if (remote) { + remote.persistPeerRestart(); + } else { + // No live RemoteHandle for the peer but a persisted incarnation + // exists — usually a transient race during kernel boot before + // initRemoteComms has finished restoring remotes. The persisted + // bookkeeping the missing handle would have cleaned up may leak. + // Surfacing as a warning so operators can correlate. + this.#logger?.warn( + `Peer ${peerId.slice(0, 8)} restart detected but no live RemoteHandle; advancing persisted incarnation without c-list cleanup`, + ); + } + } + this.#kernelStore.setPeerIncarnation(peerId, observedIncarnation); + this.#kernelStore.releaseSavepoint(savepoint); + } catch (error) { + this.#kernelStore.rollbackSavepoint(savepoint); + throw error; + } - // Reject all kernel promises where this remote is the decider - // The restarted peer has lost its state and won't resolve these promises - const { remoteId } = remote; - const failure = makeKernelError( - 'PEER_RESTARTED', - 'Remote peer restarted (incarnation changed)', - ); - for (const kpid of this.#kernelStore.getPromisesByDecider(remoteId)) { - this.#kernelQueue.resolvePromises(remoteId, [[kpid, true, failure]]); + // Post-commit fan-out: in-memory state changes and run-queue + // mutations are not reversible by a savepoint, so they wait until the + // kv layer is durable. + if (isRestart && remote) { + remote.finalizePeerRestart(); + if (promisesToReject.length > 0) { + const failure = makeKernelError( + 'PEER_RESTARTED', + 'Remote peer restarted (incarnation changed)', + ); + for (const kpid of promisesToReject) { + this.#kernelQueue.resolvePromises(remote.remoteId, [ + [kpid, true, failure], + ]); + } + } } + + return isRestart; } /** diff --git a/packages/ocap-kernel/src/remotes/platform/transport.test.ts b/packages/ocap-kernel/src/remotes/platform/transport.test.ts index 8e3ffafc72..32b6ad7fa2 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.test.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.test.ts @@ -142,6 +142,24 @@ vi.mock('@metamask/kernel-errors', () => ({ this.name = 'ResourceLimitError'; } }, + PeerRestartedError: class MockPeerRestartedError extends Error { + constructor() { + super('Remote peer restarted: message not sent to avoid stale delivery'); + this.name = 'PeerRestartedError'; + } + }, + IntentionalCloseError: class MockIntentionalCloseError extends Error { + constructor() { + super('Message delivery failed after intentional close'); + this.name = 'IntentionalCloseError'; + } + }, + NetworkStoppedError: class MockNetworkStoppedError extends Error { + constructor() { + super('Network stopped'); + this.name = 'NetworkStoppedError'; + } + }, isRetryableNetworkError: vi.fn().mockImplementation((error: unknown) => { const errorWithCode = error as { code?: string }; return ( @@ -2827,23 +2845,17 @@ describe('transport.initTransport', () => { // Trigger inbound connection inboundHandler?.(mockInboundChannel); - // Wait for rejection to be logged + // Wait for the channel to be closed: handshake failure + rejected + // restart both close the channel without registering it. await vi.waitFor(() => { - expect(mockLogger.log).toHaveBeenCalledWith( - expect.stringContaining( - 'rejecting inbound connection due to handshake failure', - ), + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + mockInboundChannel, + 'remote-peer', ); }); - - // Channel should be closed - expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( - mockInboundChannel, - 'remote-peer', - ); }); - it('calls onIncarnationChange when incarnation changes', async () => { + it('reports the observed incarnation to onIncarnationChange after every handshake', async () => { let inboundHandler: ((channel: MockChannel) => void) | undefined; mockConnectionFactory.onInboundConnection.mockImplementation( (handler: (channel: MockChannel) => void) => { @@ -2878,15 +2890,15 @@ describe('transport.initTransport', () => { inboundHandler?.(mockInboundChannel1); - // Wait for first handshake to be processed + // Fires on every successful handshake — the kernel layer is the + // authoritative comparator against persisted state. await vi.waitFor(() => { - expect(mockLogger.log).toHaveBeenCalledWith( - expect.stringContaining('first incarnation ID received'), + expect(onIncarnationChange).toHaveBeenCalledWith( + 'remote-peer', + 'incarnation-1', ); }); - - // First incarnation should not trigger onIncarnationChange - expect(onIncarnationChange).not.toHaveBeenCalled(); + expect(onIncarnationChange).toHaveBeenCalledTimes(1); // Second handshake with different incarnation (simulating peer restart) const mockInboundChannel2 = createMockChannel('remote-peer'); @@ -2904,15 +2916,137 @@ describe('transport.initTransport', () => { inboundHandler?.(mockInboundChannel2); - // Wait for second handshake to be processed await vi.waitFor(() => { - expect(mockLogger.log).toHaveBeenCalledWith( - expect.stringContaining('incarnation changed'), + expect(onIncarnationChange).toHaveBeenCalledWith( + 'remote-peer', + 'incarnation-2', ); }); + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('incarnation changed'), + ); + }); + + it('closes the dialed channel and throws PeerRestartedError when outbound handshake reports kernel-detected restart', async () => { + const localIncarnationId = 'local-incarnation'; + // Kernel verdict says "yes, restart" — overrides PSM's first-contact + // verdict and must trigger the close-without-register path. + const onIncarnationChange = vi.fn().mockResolvedValue(true); - // Changed incarnation should trigger onIncarnationChange - expect(onIncarnationChange).toHaveBeenCalledWith('remote-peer'); + const mockChannel = createMockChannel('remote-peer'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const handshakeAck = JSON.stringify({ + method: 'handshakeAck', + params: { incarnationId: 'remote-incarnation' }, + }); + mockChannel.msgStream.read.mockResolvedValueOnce( + new TextEncoder().encode(handshakeAck), + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + {}, + vi.fn().mockResolvedValue(''), + undefined, + localIncarnationId, + onIncarnationChange, + ); + + await expect( + sendRemoteMessage('remote-peer', makeTestMessage('hi')), + ).rejects.toThrow(/Remote peer restarted/u); + + // The throw must happen AFTER closeChannel and the channel must + // never be registered (no readChannel started). + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + mockChannel, + 'remote-peer', + ); + // Without registration the readChannel side never starts; verify by + // confirming no further reads were attempted on the channel. + expect(mockChannel.msgStream.read).toHaveBeenCalledTimes(1); + }); + + it('does not trigger reconnection when PeerRestartedError aborts the outbound send', async () => { + // PeerRestartedError means the peer is reachable but its incarnation + // changed; handleConnectionLoss would clobber an inbound channel a + // concurrent handshake just registered, so the catch path must skip it. + const localIncarnationId = 'local-incarnation'; + const onIncarnationChange = vi.fn().mockResolvedValue(true); + + const mockChannel = createMockChannel('remote-peer'); + mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel); + const handshakeAck = JSON.stringify({ + method: 'handshakeAck', + params: { incarnationId: 'remote-incarnation' }, + }); + mockChannel.msgStream.read.mockResolvedValueOnce( + new TextEncoder().encode(handshakeAck), + ); + + const { sendRemoteMessage } = await initTransport( + '0x1234', + {}, + vi.fn().mockResolvedValue(''), + undefined, + localIncarnationId, + onIncarnationChange, + ); + + await expect( + sendRemoteMessage('remote-peer', makeTestMessage('hi')), + ).rejects.toThrow(/Remote peer restarted/u); + + // Drain microtasks so any reconnection scheduling would have fired. + for (let i = 0; i < 5; i += 1) { + await Promise.resolve(); + } + + // No reconnect attempt: dialIdempotent is only the original dial. + expect(mockConnectionFactory.dialIdempotent).toHaveBeenCalledTimes(1); + }); + + it('rejects the inbound channel when kernel detects a restart on inbound handshake', async () => { + // Symmetric with the outbound PeerRestartedError path: when the + // receiver's persisted incarnation differs from the observed one, + // the channel must NOT be registered — otherwise concurrent in-flight + // outbound sends could write pre-restart payloads on a fresh channel. + let inboundHandler: ((channel: MockChannel) => void) | undefined; + mockConnectionFactory.onInboundConnection.mockImplementation( + (handler: (channel: MockChannel) => void) => { + inboundHandler = handler; + }, + ); + const onIncarnationChange = vi.fn().mockResolvedValue(true); + const localIncarnationId = 'local-incarnation'; + await initTransport( + '0x1234', + {}, + vi.fn().mockResolvedValue(''), + undefined, + localIncarnationId, + onIncarnationChange, + ); + + const inbound = createMockChannel('remote-peer'); + inbound.msgStream.read.mockResolvedValueOnce( + new TextEncoder().encode( + JSON.stringify({ + method: 'handshake', + params: { incarnationId: 'fresh-incarnation' }, + }), + ), + ); + inboundHandler?.(inbound); + + await vi.waitFor(() => { + expect(mockConnectionFactory.closeChannel).toHaveBeenCalledWith( + inbound, + 'remote-peer', + ); + }); + // No further reads on the channel — registerChannel never ran. + expect(inbound.msgStream.read).toHaveBeenCalledTimes(1); }); it('passes regular messages to remoteMessageHandler after handshake', async () => { diff --git a/packages/ocap-kernel/src/remotes/platform/transport.ts b/packages/ocap-kernel/src/remotes/platform/transport.ts index 34e9414557..ba39f0cf92 100644 --- a/packages/ocap-kernel/src/remotes/platform/transport.ts +++ b/packages/ocap-kernel/src/remotes/platform/transport.ts @@ -1,6 +1,12 @@ import { StreamResetError } from '@libp2p/interface'; import type { StreamCloseEvent } from '@libp2p/interface'; -import { AbortError, ResourceLimitError } from '@metamask/kernel-errors'; +import { + AbortError, + IntentionalCloseError, + NetworkStoppedError, + PeerRestartedError, + ResourceLimitError, +} from '@metamask/kernel-errors'; import { installWakeDetector } from '@metamask/kernel-utils'; import { Logger } from '@metamask/logger'; import { toString as bufToString, fromString } from 'uint8arrays'; @@ -202,28 +208,54 @@ export async function initTransport( outputError(channel.peerId, 'outbound handshake', problem); return { success: false, incarnationChanged: false }; } - // Handle incarnation change outside try-catch so callback errors - // don't incorrectly mark the handshake as failed - if (result.incarnationChanged) { + // The PSM-detected change is unreliable across receiver-side state loss; + // the kernel-side callback (backed by persistent storage) is the + // authoritative source. Take the OR of both so callers' stale-message + // guards trip whenever either layer detected a restart. + // + // Treat a callback throw as a handshake failure rather than letting it + // propagate: the caller relies on the {success} return to decide + // whether to close the channel, and an unhandled throw would skip + // that close path on the inbound side. + let kernelDetectedRestart: boolean; + try { + kernelDetectedRestart = + (await onIncarnationChange?.( + channel.peerId, + result.remoteIncarnationId, + )) ?? false; + } catch (problem) { + outputError(channel.peerId, 'outbound incarnation callback', problem); + return { success: false, incarnationChanged: false }; + } + const incarnationChanged = + result.incarnationChanged || kernelDetectedRestart; + if (incarnationChanged) { logger.log( `${channel.peerId.slice(0, 8)}:: incarnation changed during outbound handshake, resetting remote state`, ); - // Call incarnation change callback first to reset RemoteHandle state - onIncarnationChange?.(channel.peerId); } - return { success: true, incarnationChanged: result.incarnationChanged }; + return { success: true, incarnationChanged }; } /** * Perform inbound handshake and handle incarnation changes. - * Returns true if handshake succeeded (or was skipped), false if it failed. + * + * On detected restart we reject the inbound channel: the kernel just tore + * down its c-list and seq state for this peer, so accepting the channel + * here would let any concurrent in-flight outbound send write a + * pre-restart payload over a fresh-incarnation channel. The caller closes + * the channel; the peer will re-dial and the next handshake sees the + * now-persisted incarnation and proceeds normally — symmetric with the + * outbound `PeerRestartedError` path. * * @param channel - The channel to perform handshake on. - * @returns True if handshake succeeded or was skipped. + * @returns True if handshake succeeded and the channel should be registered; + * false if the handshake failed OR a restart was detected (caller closes). */ async function doInboundHandshake(channel: Channel): Promise { if (!handshakeDeps) { - return true; // No handshake configured, skip + return true; } let result; try { @@ -232,14 +264,26 @@ export async function initTransport( outputError(channel.peerId, 'inbound handshake', problem); return false; } - // Handle incarnation change outside try-catch so callback errors - // don't incorrectly mark the handshake as failed - if (result.incarnationChanged) { + // Treat a callback throw as a handshake failure: the caller closes + // the channel only when this returns false, so an unhandled throw + // would let the channel escape upstream and be closed (noisily) by + // the outer onInboundConnection catch. Better to fail closed here. + let kernelDetectedRestart: boolean; + try { + kernelDetectedRestart = + (await onIncarnationChange?.( + channel.peerId, + result.remoteIncarnationId, + )) ?? false; + } catch (problem) { + outputError(channel.peerId, 'inbound incarnation callback', problem); + return false; + } + if (result.incarnationChanged || kernelDetectedRestart) { logger.log( - `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, resetting remote state`, + `${channel.peerId.slice(0, 8)}:: incarnation changed during inbound handshake, rejecting channel to force re-dial`, ); - // Call incarnation change callback first to reset RemoteHandle state - onIncarnationChange?.(channel.peerId); + return false; } return true; } @@ -498,12 +542,11 @@ export async function initTransport( message: string, ): Promise { if (signal.aborted) { - throw Error('Network stopped'); + throw new NetworkStoppedError(); } - // Check if peer is intentionally closed if (peerStateManager.isIntentionallyClosed(targetPeerId)) { - throw Error('Message delivery failed after intentional close'); + throw new IntentionalCloseError(); } // Validate message size before sending @@ -586,17 +629,39 @@ export async function initTransport( throw Error('Handshake failed'); } if (handshakeResult.incarnationChanged) { - // Peer restarted - don't send stale message, let caller retry with fresh state - registerChannel(targetPeerId, channel, 'reading channel to'); - throw Error( - 'Remote peer restarted: message not sent to avoid stale delivery', - ); + // Peer restarted: close this freshly-dialed channel without + // registering it. Subsequent sends re-dial; that next handshake + // sees the now-persisted incarnation and proceeds normally. + // Registering here would let concurrent in-flight sends bypass + // the handshake check and write pre-restart payloads on the + // same channel. + try { + await connectionFactory.closeChannel(channel, targetPeerId); + } catch (closeError) { + outputError( + targetPeerId, + 'closing channel after peer restart', + closeError, + ); + } + throw new PeerRestartedError(); } registerChannel(targetPeerId, channel, 'reading channel to'); } } catch (problem) { - outputError(targetPeerId, `opening connection`, problem); - handleConnectionLoss(targetPeerId); + // PeerRestartedError means the handshake succeeded against a + // reachable peer that just changed incarnations — don't treat that + // as a connectivity failure. handleConnectionLoss would clear + // state.channel and could clobber an inbound channel a concurrent + // handshake just registered. + if (problem instanceof PeerRestartedError) { + logger.log( + `${targetPeerId.slice(0, 8)}:: aborting outbound send: peer restarted`, + ); + } else { + outputError(targetPeerId, `opening connection`, problem); + handleConnectionLoss(targetPeerId); + } throw problem; } } @@ -664,12 +729,11 @@ export async function initTransport( throw error; } - // Perform handshake before registering the channel + // Perform handshake before registering the channel. doInboundHandshake + // returns false on either handshake failure or a detected peer restart; + // both cases require closing the channel without registration. const handshakeOk = await doInboundHandshake(channel); if (!handshakeOk) { - logger.log( - `${channel.peerId}:: rejecting inbound connection due to handshake failure`, - ); await connectionFactory.closeChannel(channel, channel.peerId); return; } diff --git a/packages/ocap-kernel/src/remotes/types.ts b/packages/ocap-kernel/src/remotes/types.ts index 8ae083af5d..943be5d39f 100644 --- a/packages/ocap-kernel/src/remotes/types.ts +++ b/packages/ocap-kernel/src/remotes/types.ts @@ -40,11 +40,27 @@ export type RemoteComms = RemoteIdentity & { export type OnRemoteGiveUp = (peerId: string) => void; /** - * Callback invoked when a remote peer's incarnation ID changes (peer restarted). + * Callback invoked after every successful handshake with a remote peer, + * carrying the incarnationId the peer just reported. * - * @param peerId - The peer ID whose incarnation changed. + * Fires unconditionally (not only on detected change) so the kernel layer can + * compare the observed value against persisted state and detect a peer + * restart even when the in-memory PeerStateManager has been rebuilt empty + * (e.g. after a receiver restart or stale-peer cleanup). + * + * Resolves `true` if the kernel detected an actual restart (and reset its + * RemoteHandle state). The transport awaits this and uses the verdict to + * suppress stale outbound messages on the same connection — the in-memory + * PSM check is unreliable across receiver-side state loss. + * + * @param peerId - The peer ID that completed the handshake. + * @param observedIncarnation - The incarnationId the peer reported. + * @returns Whether the peer was determined to have restarted. */ -export type OnIncarnationChange = (peerId: string) => void; +export type OnIncarnationChange = ( + peerId: string, + observedIncarnation: string, +) => Promise; /** * Options for initializing remote communications. diff --git a/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts index f2655c5567..5d59da82ec 100644 --- a/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts +++ b/packages/ocap-kernel/src/rpc/kernel-remote/remoteIncarnationChange.ts @@ -1,26 +1,37 @@ import type { MethodSpec, Handler } from '@metamask/kernel-rpc-methods'; -import { object, string, literal } from '@metamask/superstruct'; +import { object, string, boolean } from '@metamask/superstruct'; import type { Infer } from '@metamask/superstruct'; const paramsStruct = object({ peerId: string(), + observedIncarnation: string(), }); type Params = Infer; export type RemoteIncarnationChangeSpec = MethodSpec< 'remoteIncarnationChange', - { peerId: string }, - null + Params, + boolean >; export const remoteIncarnationChangeSpec: RemoteIncarnationChangeSpec = { method: 'remoteIncarnationChange', params: paramsStruct, - result: literal(null), -}; - -export type HandleRemoteIncarnationChange = (peerId: string) => Promise; + // Using the boolean struct directly results in `Struct | Struct`, + // which doesn't unify with `Struct`. Cast through the spec type. + result: boolean(), +} as RemoteIncarnationChangeSpec; + +/** + * Returns true if the kernel detected a peer restart (and reset its + * RemoteHandle state); the transport uses this to suppress stale outbound + * messages on the same connection. + */ +export type HandleRemoteIncarnationChange = ( + peerId: string, + observedIncarnation: string, +) => Promise; type RemoteIncarnationChangeHooks = { remoteIncarnationChange: HandleRemoteIncarnationChange; @@ -29,15 +40,13 @@ type RemoteIncarnationChangeHooks = { export type RemoteIncarnationChangeHandler = Handler< 'remoteIncarnationChange', Params, - Promise, + Promise, RemoteIncarnationChangeHooks >; -export const remoteIncarnationChangeHandler: RemoteIncarnationChangeHandler = { +export const remoteIncarnationChangeHandler = { ...remoteIncarnationChangeSpec, hooks: { remoteIncarnationChange: true }, - implementation: async ({ remoteIncarnationChange }, params) => { - await remoteIncarnationChange(params.peerId); - return null; - }, -}; + implementation: async ({ remoteIncarnationChange }, params) => + remoteIncarnationChange(params.peerId, params.observedIncarnation), +} as RemoteIncarnationChangeHandler; diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index ba7c068a9d..3db43c3e31 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -79,6 +79,7 @@ describe('kernel store', () => { 'erefToKref', 'exportFromEndpoint', 'flushCrankBuffer', + 'forgetEndpointImports', 'forgetEref', 'forgetKref', 'forgetTerminatedVat', @@ -98,6 +99,7 @@ describe('kernel store', () => { 'getNextVatId', 'getObjectRefCount', 'getOwner', + 'getPeerIncarnation', 'getPendingMessage', 'getPinnedObjects', 'getPromisesByDecider', @@ -159,6 +161,7 @@ describe('kernel store', () => { 'setGCActions', 'setKernelServiceKref', 'setObjectRefCount', + 'setPeerIncarnation', 'setPendingMessage', 'setPromiseDecider', 'setRelayEntries', diff --git a/packages/ocap-kernel/src/store/methods/remote.test.ts b/packages/ocap-kernel/src/store/methods/remote.test.ts index 86660cf79b..ccd4147a5b 100644 --- a/packages/ocap-kernel/src/store/methods/remote.test.ts +++ b/packages/ocap-kernel/src/store/methods/remote.test.ts @@ -2,7 +2,8 @@ import { describe, it, expect, beforeEach, vi } from 'vitest'; import { getBaseMethods } from './base.ts'; import { getRemoteMethods } from './remote.ts'; -import type { RemoteId, RemoteInfo } from '../../types.ts'; +import type { RemoteInfo } from '../../remotes/types.ts'; +import type { RemoteId } from '../../types.ts'; import type { StoreContext } from '../types.ts'; vi.mock('./base.ts', () => ({ @@ -137,6 +138,19 @@ describe('remote store methods', () => { expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); }); + + it('clears persisted peer incarnation so a re-established remote starts fresh', () => { + mockKV.set(`remote.${remoteId1}`, JSON.stringify(remoteInfo1)); + mockKV.set(`peerIncarnation.${remoteInfo1.peerId}`, 'incarnation-A'); + mockGetPrefixedKeys.mockReturnValue([]); + + remoteMethods.deleteRemoteInfo(remoteId1); + + // Without this cleanup a re-established remote with the same peerId + // would mis-classify its first handshake as a restart against the + // leftover incarnation. + expect(mockKV.has(`peerIncarnation.${remoteInfo1.peerId}`)).toBe(false); + }); }); describe('getAllRemoteRecords', () => { diff --git a/packages/ocap-kernel/src/store/methods/remote.ts b/packages/ocap-kernel/src/store/methods/remote.ts index f1b5a31390..23ae73aec5 100644 --- a/packages/ocap-kernel/src/store/methods/remote.ts +++ b/packages/ocap-kernel/src/store/methods/remote.ts @@ -21,6 +21,7 @@ const REMOTE_INFO_BASE = 'remote.'; const REMOTE_INFO_BASE_LEN = REMOTE_INFO_BASE.length; const REMOTE_SEQ_BASE = 'remoteSeq.'; const REMOTE_PENDING_BASE = 'remotePending.'; +const PEER_INCARNATION_BASE = 'peerIncarnation.'; /** * Get a kernel store object that provides functionality for managing remote records. @@ -70,12 +71,34 @@ export function getRemoteMethods(ctx: StoreContext) { } /** - * Delete the info for a remote. + * Delete the info for a remote, including its persisted peer-incarnation + * record. Read the info before deleting so we have the peerId to scope + * the peerIncarnation cleanup; otherwise stale `peerIncarnation.{peerId}` + * entries would survive remote teardown and a re-established remote with + * the same peerId would mis-classify its first handshake as a restart. + * + * Corrupt JSON in `remote.{remoteID}` is logged and swallowed so the + * remaining cleanup steps still run — losing the (already-untrustworthy) + * peerIncarnation row is preferable to leaving the corrupt entry stuck. * * @param remoteID - The remote whose info is to be removed. */ function deleteRemoteInfo(remoteID: RemoteId): void { - kv.delete(`${REMOTE_INFO_BASE}${remoteID}`); + const infoKey = `${REMOTE_INFO_BASE}${remoteID}`; + const rawInfo = kv.get(infoKey); + if (rawInfo !== undefined) { + try { + const { peerId } = JSON.parse(rawInfo) as RemoteInfo; + kv.delete(`${PEER_INCARNATION_BASE}${peerId}`); + } catch (parseError) { + ctx.logger?.error( + `deleteRemoteInfo: corrupt remote info for ${remoteID}, ` + + `proceeding without peerIncarnation cleanup`, + parseError, + ); + } + } + kv.delete(infoKey); deleteRemotePendingState(remoteID); } @@ -230,6 +253,29 @@ export function getRemoteMethods(ctx: StoreContext) { return deletedCount; } + /** + * Get the last observed incarnationId for a peer. This is the value most + * recently negotiated via handshake; it survives kernel restart so that the + * receiver can detect a peer restart even when its in-memory PeerStateManager + * has been rebuilt empty. + * + * @param peerId - The peer whose incarnation is sought. + * @returns The persisted incarnationId, or undefined if none recorded yet. + */ + function getPeerIncarnation(peerId: string): string | undefined { + return kv.get(`${PEER_INCARNATION_BASE}${peerId}`); + } + + /** + * Persist the most recently observed incarnationId for a peer. + * + * @param peerId - The peer to record. + * @param incarnationId - The observed incarnationId. + */ + function setPeerIncarnation(peerId: string, incarnationId: string): void { + kv.set(`${PEER_INCARNATION_BASE}${peerId}`, incarnationId); + } + /** * Clear all sequence state for a remote (seq counters + all pending messages). * Called when a remote peer restarts (incarnation changes) to reset for fresh communication. @@ -267,5 +313,8 @@ export function getRemoteMethods(ctx: StoreContext) { deleteRemotePendingState, cleanupOrphanMessages, clearRemoteSeqState, + // Peer incarnation persistence + getPeerIncarnation, + setPeerIncarnation, }; } diff --git a/packages/ocap-kernel/src/store/methods/vat.test.ts b/packages/ocap-kernel/src/store/methods/vat.test.ts index c1345c2e66..d04c609896 100644 --- a/packages/ocap-kernel/src/store/methods/vat.test.ts +++ b/packages/ocap-kernel/src/store/methods/vat.test.ts @@ -7,7 +7,7 @@ import * as promiseModule from './promise.ts'; import * as reachableModule from './reachable.ts'; import * as refCountModule from './refcount.ts'; import { getVatMethods } from './vat.ts'; -import type { VatConfig, VatId } from '../../types.ts'; +import type { RemoteId, VatConfig, VatId } from '../../types.ts'; import type { StoreContext } from '../types.ts'; // Mock the parseRef function @@ -19,6 +19,16 @@ vi.mock('../utils/parse-ref.ts', () => ({ return { context: 'vat', direction: 'import', isPromise: false }; } else if (ref.startsWith('p+')) { return { context: 'vat', direction: 'export', isPromise: true }; + } else if (ref.startsWith('p-')) { + return { context: 'vat', direction: 'import', isPromise: true }; + } else if (ref.startsWith('ro+')) { + return { context: 'remote', direction: 'export', isPromise: false }; + } else if (ref.startsWith('ro-')) { + return { context: 'remote', direction: 'import', isPromise: false }; + } else if (ref.startsWith('rp+')) { + return { context: 'remote', direction: 'export', isPromise: true }; + } else if (ref.startsWith('rp-')) { + return { context: 'remote', direction: 'import', isPromise: true }; } else if (ref.startsWith('ko')) { return { context: 'kernel', direction: null, isPromise: false }; } else if (ref.startsWith('kp')) { @@ -526,4 +536,171 @@ describe('vat store methods', () => { expect(result).toBe(false); }); }); + + describe('forgetEndpointImports', () => { + const endpointId = 'r1' as RemoteId; + + /** + * Seed the c-list-keyed entries the function iterates and arrange + * `getPrefixedKeys` to return them in the order they would appear in + * storage. Erefs use the realistic remote-style polarity (`ro+`, `ro-`, + * `rp+`, `rp-`) since the function is scoped to RemoteId endpoints. + * + * @param erefs - Pairs of [eref, kref] to seed, in lexicographic order. + */ + function seedClist(erefs: [string, string][]): void { + for (const [eref, kref] of erefs) { + mockKV.set(`slot.${endpointId}.${eref}`, kref); + mockKV.set(`slot.${endpointId}.${kref}`, eref); + } + mockGetPrefixedKeys.mockImplementation((prefix: string) => { + if (prefix === `${endpointId}.c.`) { + // Emit both eref-keyed and kref-keyed entries so the function's + // "skip kref-keyed" branch is exercised. + return erefs.flatMap(([eref, kref]) => [ + `${endpointId}.c.${eref}`, + `${endpointId}.c.${kref}`, + ]); + } + return []; + }); + } + + it("decrements the decider refcount for the peer's promise exports", () => { + seedClist([['rp+1', 'kp123']]); + mockGetKernelPromise.mockReturnValue({ decider: endpointId }); + + vatMethods.forgetEndpointImports(endpointId); + + expect(mockDeleteCListEntry).toHaveBeenCalledWith( + endpointId, + 'kp123', + 'rp+1', + ); + expect(mockDecrementRefCount).toHaveBeenCalledWith( + 'kp123', + 'cleanup|peerRestart|promise|decider', + ); + }); + + it('skips the decider decrement when the peer is no longer the decider', () => { + seedClist([['rp+1', 'kp123']]); + mockGetKernelPromise.mockReturnValue({ decider: 'someoneElse' }); + + vatMethods.forgetEndpointImports(endpointId); + + expect(mockDeleteCListEntry).toHaveBeenCalledWith( + endpointId, + 'kp123', + 'rp+1', + ); + expect(mockDecrementRefCount).not.toHaveBeenCalled(); + }); + + it("releases the peer's object exports: owner, c-list, baseline refcount, GC", () => { + seedClist([['ro+7', 'ko42']]); + mockKV.set(`owner.ko42`, endpointId); + mockGetReachableAndVatSlot.mockReturnValue({ vatSlot: 'ro+7' }); + + vatMethods.forgetEndpointImports(endpointId); + + expect(mockKV.has(`owner.ko42`)).toBe(false); + expect(mockKV.has(`slot.${endpointId}.ko42`)).toBe(false); + expect(mockKV.has(`slot.${endpointId}.ro+7`)).toBe(false); + expect(mockDecrementRefCount).toHaveBeenCalledWith( + 'ko42', + 'cleanup|peerRestart|export|baseline', + ); + expect(mockMaybeFreeKrefs.add).toHaveBeenCalledWith('ko42'); + // Object-export tear-down handles the c-list pair directly; we don't + // also call deleteCListEntry (which uses the recognizable-only path + // and would corrupt the count). + expect(mockDeleteCListEntry).not.toHaveBeenCalled(); + }); + + it('preserves baseline refcount when ownership has migrated', () => { + seedClist([['ro+7', 'ko42']]); + mockKV.set(`owner.ko42`, 'someoneElse'); + mockGetReachableAndVatSlot.mockReturnValue({ vatSlot: 'ro+7' }); + + vatMethods.forgetEndpointImports(endpointId); + + // Foreign owner survives — the baseline reference is theirs now. + expect(mockKV.get(`owner.ko42`)).toBe('someoneElse'); + // Our c-list pair is still torn down (the peer can't reach the kref + // through us anymore), but the refcount stays untouched so we don't + // corrupt the new owner's accounting. + expect(mockKV.has(`slot.${endpointId}.ko42`)).toBe(false); + expect(mockKV.has(`slot.${endpointId}.ro+7`)).toBe(false); + expect(mockDecrementRefCount).not.toHaveBeenCalled(); + expect(mockMaybeFreeKrefs.add).not.toHaveBeenCalled(); + }); + + it('preserves our exports to the peer (import-direction entries)', () => { + // Both object and promise imports — neither should be touched. + seedClist([ + ['ro-3', 'ko99'], + ['rp-2', 'kp77'], + ]); + + vatMethods.forgetEndpointImports(endpointId); + + expect(mockDeleteCListEntry).not.toHaveBeenCalled(); + expect(mockDecrementRefCount).not.toHaveBeenCalled(); + expect(mockMaybeFreeKrefs.add).not.toHaveBeenCalled(); + // Mappings stay so a fresh incarnation can keep referring to alice etc. + expect(mockKV.get(`slot.${endpointId}.ro-3`)).toBe('ko99'); + expect(mockKV.get(`slot.${endpointId}.ko99`)).toBe('ro-3'); + expect(mockKV.get(`slot.${endpointId}.rp-2`)).toBe('kp77'); + expect(mockKV.get(`slot.${endpointId}.kp77`)).toBe('rp-2'); + }); + + it('processes mixed entries in one pass without disturbing imports', () => { + seedClist([ + ['ro+7', 'ko42'], // peer's object export — clean up + ['ro-3', 'ko99'], // our export to the peer — keep + ['rp+1', 'kp123'], // peer's promise export — clean up + ]); + mockKV.set(`owner.ko42`, endpointId); + mockGetKernelPromise.mockReturnValue({ decider: endpointId }); + mockGetReachableAndVatSlot.mockReturnValue({ vatSlot: 'ro+7' }); + + vatMethods.forgetEndpointImports(endpointId); + + // Peer's exports gone. + expect(mockKV.has(`slot.${endpointId}.ro+7`)).toBe(false); + expect(mockDeleteCListEntry).toHaveBeenCalledWith( + endpointId, + 'kp123', + 'rp+1', + ); + // Our export retained. + expect(mockKV.get(`slot.${endpointId}.ro-3`)).toBe('ko99'); + }); + + it('logs and skips when an eref entry has no matching kref slot', () => { + // Seed only the eref-keyed side (simulating c-list inconsistency). + mockKV.set(`slot.${endpointId}.rp+1`, 'kp123'); + mockKV.delete(`slot.${endpointId}.rp+1`); + mockGetPrefixedKeys.mockImplementation((prefix: string) => { + if (prefix === `${endpointId}.c.`) { + return [`${endpointId}.c.rp+1`]; + } + return []; + }); + const warnSpy = vi.fn(); + // Override the logger on context with a spy by re-creating vatMethods + // pointing at a context with a logger that captures warns. + const ctxWithLogger = { ...context, logger: { warn: warnSpy } }; + const localVatMethods = getVatMethods(ctxWithLogger as StoreContext); + + expect(() => + localVatMethods.forgetEndpointImports(endpointId), + ).not.toThrow(); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('has no kref slot'), + ); + expect(mockDeleteCListEntry).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/methods/vat.ts b/packages/ocap-kernel/src/store/methods/vat.ts index 9ed504c756..84e065ee55 100644 --- a/packages/ocap-kernel/src/store/methods/vat.ts +++ b/packages/ocap-kernel/src/store/methods/vat.ts @@ -6,7 +6,14 @@ import { getObjectMethods } from './object.ts'; import { getPromiseMethods } from './promise.ts'; import { getReachableMethods } from './reachable.ts'; import { getRefCountMethods } from './refcount.ts'; -import type { EndpointId, KRef, VatConfig, VatId, ERef } from '../../types.ts'; +import type { + EndpointId, + KRef, + RemoteId, + VatConfig, + VatId, + ERef, +} from '../../types.ts'; import type { StoreContext, VatCleanupWork } from '../types.ts'; import { parseRef } from '../utils/parse-ref.ts'; import { parseReachableAndVatSlot } from '../utils/reachable.ts'; @@ -318,6 +325,103 @@ export function getVatMethods(ctx: StoreContext) { return getTerminatedVats().length > 0; } + /** + * Clean up the c-list entries an endpoint introduced into the kernel — the + * "+"-direction erefs the endpoint allocated and shared with us. These + * become dead the moment the endpoint restarts: their kernel objects have + * no live owner, their kernel promises can never be resolved by their + * original decider, and any future c-list lookup for one of those reused + * eref labels would route a fresh incarnation's traffic into stale state. + * + * Mirrors the export/promise legs of {@link cleanupTerminatedVat} but + * scoped to a single endpoint and only its own contributions, so our + * exports to that endpoint (alice's root, etc.) stay reachable when a + * fresh incarnation reconnects with the same peer ID. + * + * The caller (RemoteManager.#handleIncarnationChange) rejects promises the + * endpoint was deciding *before* invoking this, so its + * `getPromisesByDecider` query can still find them through the c-list this + * function is about to tear down. + * + * @param endpointId - The endpoint whose contributions are to be dropped. + */ + function forgetEndpointImports(endpointId: RemoteId): void { + const clistPrefix = `${endpointId}.c.`; + const erefsToForget: ERef[] = []; + for (const key of getPrefixedKeys(clistPrefix)) { + const ref = key.slice(clistPrefix.length); + // The c-list stores both directions of each pair (kref-keyed and + // eref-keyed). Iterate by eref only; deleteCListEntry handles the + // kref-keyed counterpart. + if (ref.startsWith('k')) { + continue; + } + const { direction } = parseRef(ref); + if (direction === 'export') { + erefsToForget.push(ref as ERef); + } + } + + for (const eref of erefsToForget) { + const slotKey = getSlotKey(endpointId, eref); + const kref = ctx.kv.get(slotKey); + if (!kref) { + ctx.logger?.warn( + `forgetEndpointImports: c-list entry ${eref} for endpoint ${endpointId} ` + + `has no kref slot — skipping (possible c-list inconsistency)`, + ); + continue; + } + const { isPromise } = parseRef(eref); + if (isPromise) { + // deleteCListEntry decrements the promise refcount via the + // recognizable path. Additionally, if the endpoint was still + // recorded as decider, drop the decider's reference too. + deleteCListEntry(endpointId, kref, eref); + const kp = getKernelPromise(kref); + if (kp.decider === endpointId) { + decrementRefCount(kref, 'cleanup|peerRestart|promise|decider'); + } + } else { + // Object exports: drop the owner mapping if it still names the + // restarting endpoint, decrement the baseline refcount the kernel + // implicitly held while the endpoint owned the object, and queue + // it for GC. Then tear down the c-list pair. + // + // We deliberately do NOT call deleteCListEntry here: that path uses + // `onlyRecognizable: true`, which is the right semantics for an + // endpoint dropping its imports but the wrong semantics for + // releasing an export the endpoint owned. The baseline decrement + // below corresponds to the implicit reference exportFromEndpoint + // installed when the kernel object was first created. + const ownerKey = getOwnerKey(kref); + const currentOwner = ctx.kv.get(ownerKey); + const stillOwned = currentOwner === endpointId; + if (stillOwned) { + ctx.kv.delete(ownerKey); + } else if (currentOwner !== undefined) { + // Ownership has migrated (e.g. via a kernel-internal handoff). + // The baseline reference is now owed to the new owner; do not + // decrement against their accounting. Tear down our c-list pair + // and stop — the new owner is responsible for the kref's lifetime. + ctx.logger?.warn( + `forgetEndpointImports: kref ${kref} was exported by ${endpointId} ` + + `but is now owned by ${currentOwner}; preserving baseline refcount`, + ); + const { vatSlot } = getReachableAndVatSlot(endpointId, kref); + ctx.kv.delete(getSlotKey(endpointId, kref)); + ctx.kv.delete(getSlotKey(endpointId, vatSlot)); + continue; + } + const { vatSlot } = getReachableAndVatSlot(endpointId, kref); + ctx.kv.delete(getSlotKey(endpointId, kref)); + ctx.kv.delete(getSlotKey(endpointId, vatSlot)); + decrementRefCount(kref, 'cleanup|peerRestart|export|baseline'); + ctx.maybeFreeKrefs.add(kref); + } + } + } + /** * Create the kernel's representation of an export from an endpoint. * @@ -366,5 +470,6 @@ export function getVatMethods(ctx: StoreContext) { nextTerminatedVatCleanup, isVatActive, exportFromEndpoint, + forgetEndpointImports, }; }