Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/extension/test/e2e/remote-comms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { rm } from 'node:fs/promises';

import { loadExtension, sessionPath } from '../helpers.ts';

test.describe.configure({ mode: 'serial', timeout: 60_000 });
test.describe.configure({ mode: 'serial', timeout: 90_000 });

/**
* End-to-end tests for remote communications functionality.
Expand Down Expand Up @@ -140,7 +140,11 @@ test.describe('Remote Communications', () => {
const messageResponse = popupPage1.locator(
'[data-testid="message-response"]',
);
await expect(messageResponse).toBeVisible({ timeout: 30_000 });
// Budget: redemption timeout is `ackTimeoutMs * (MAX_RETRIES + 1)` =
// 40s with prod defaults; the response (success or rejection) is
// guaranteed to render before then. 50s gives 10s headroom for the
// post-redemption render path under CI load.
await expect(messageResponse).toBeVisible({ timeout: 50_000 });
await expect(messageResponse).toContainText(
// eslint-disable-next-line no-useless-escape
`Response:{\"body\":\"#\\\"vat Bob got \\\\\\\"hello\\\\\\\" from remote Alice\\\"\",\"slots\":[]}`,
Expand Down
4 changes: 4 additions & 0 deletions packages/kernel-browser-runtime/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Process platform-services RPC request handlers in the background so a request handler that fires a reentrant outbound RPC (e.g. transport handshake calling back into the kernel) cannot deadlock waiting for its response ([#948](https://github.com/MetaMask/ocap-kernel/pull/948))

## [0.6.0]

### Added
Expand Down
191 changes: 191 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,197 @@ describe('PlatformServicesClient', () => {
expect(successResponse).toBeDefined();
});
});

describe('remoteIncarnationChange', () => {
it('forwards args to handler and returns its boolean verdict', async () => {
const outputs: MessageEventWithPayload[] = [];
const testStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
const testClient = new PlatformServicesClient(
testStream as unknown as PlatformServicesClientStream,
clientLogger,
);
await delay(10);

const remoteHandler = vi.fn(async () => 'response');
const giveUpHandler = vi.fn();
const incarnationHandler = vi.fn(async () => true);
const initP = testClient.initializeRemoteComms(
'0xabcd',
{},
remoteHandler,
giveUpHandler,
'local-incarnation',
incarnationHandler,
);
await testStream.receiveInput(makeNullReply('m1'));
await initP;

await testStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm2',
jsonrpc: '2.0',
method: 'remoteIncarnationChange',
params: {
peerId: 'peer-456',
observedIncarnation: 'incarnation-X',
},
},
}),
);
await delay(50);

expect(incarnationHandler).toHaveBeenCalledExactlyOnceWith(
'peer-456',
'incarnation-X',
);
const successResponse = outputs.find(
(message) =>
message.payload?.id === 'm2' &&
'result' in message.payload &&
message.payload.result === true,
);
expect(successResponse).toBeDefined();
});

it('returns false when no handler is registered', async () => {
const outputs: MessageEventWithPayload[] = [];
const newStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
// eslint-disable-next-line no-new -- test setup
new PlatformServicesClient(
newStream as unknown as PlatformServicesClientStream,
);

await newStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm1',
jsonrpc: '2.0',
method: 'remoteIncarnationChange',
params: {
peerId: 'peer-789',
observedIncarnation: 'incarnation-Y',
},
},
}),
);
await delay(10);

const response = outputs.find(
(message) =>
message.payload?.id === 'm1' &&
'result' in message.payload &&
message.payload.result === false,
);
expect(response).toBeDefined();
});

it('coerces non-boolean handler return to true (fail closed)', async () => {
const outputs: MessageEventWithPayload[] = [];
const testStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
const testClient = new PlatformServicesClient(
testStream as unknown as PlatformServicesClientStream,
clientLogger,
);
await delay(10);

const remoteHandler = vi.fn(async () => 'response');
// Handler returns a non-boolean truthy value (a buggy caller).
const incarnationHandler = vi.fn(
async () => 'oops' as unknown as boolean,
);
const initP = testClient.initializeRemoteComms(
'0xabcd',
{},
remoteHandler,
undefined,
'local-incarnation',
incarnationHandler,
);
await testStream.receiveInput(makeNullReply('m1'));
await initP;

await testStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm2',
jsonrpc: '2.0',
method: 'remoteIncarnationChange',
params: {
peerId: 'peer-789',
observedIncarnation: 'incarnation-Z',
},
},
}),
);
await delay(50);

// Fail closed → resolve to true so the transport drops the outbound.
const response = outputs.find(
(message) =>
message.payload?.id === 'm2' &&
'result' in message.payload &&
message.payload.result === true,
);
expect(response).toBeDefined();
});

it('coerces a throwing handler to true (fail closed)', async () => {
const outputs: MessageEventWithPayload[] = [];
const testStream = await TestDuplexStream.make((message) => {
outputs.push(message as unknown as MessageEventWithPayload);
});
const testClient = new PlatformServicesClient(
testStream as unknown as PlatformServicesClientStream,
clientLogger,
);
await delay(10);

const remoteHandler = vi.fn(async () => 'response');
const incarnationHandler = vi.fn(async () => {
throw new Error('handler exploded');
});
const initP = testClient.initializeRemoteComms(
'0xabcd',
{},
remoteHandler,
undefined,
'local-incarnation',
incarnationHandler,
);
await testStream.receiveInput(makeNullReply('m1'));
await initP;

await testStream.receiveInput(
new MessageEvent('message', {
data: {
id: 'm2',
jsonrpc: '2.0',
method: 'remoteIncarnationChange',
params: {
peerId: 'peer-789',
observedIncarnation: 'incarnation-Z',
},
},
}),
);
await delay(50);

const response = outputs.find(
(message) =>
message.payload?.id === 'm2' &&
'result' in message.payload &&
message.payload.result === true,
);
expect(response).toBeDefined();
});
});
});
});
});
91 changes: 68 additions & 23 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
PostMessageTarget,
} from '@metamask/streams/browser';
import { isJsonRpcResponse, isJsonRpcRequest } from '@metamask/utils';
import type { JsonRpcRequest } from '@metamask/utils';
import type { JsonRpcId } from '@metamask/utils';

// Appears in the docs.
Expand Down Expand Up @@ -329,16 +330,48 @@ export class PlatformServicesClient implements PlatformServices {
}

/**
* Handle a remote incarnation change notification from the server.
* Forward an incarnationId observed during a peer handshake to the kernel
* layer. Fires on every successful handshake; the kernel decides whether
* it represents a peer restart and returns the verdict so the transport
* can suppress stale outbound messages.
*
* @param peerId - The peer ID of the remote that restarted.
* @returns A promise that resolves when handling is complete.
* Fails closed: handler exceptions and non-boolean returns coerce to
* `true` so the transport drops the outbound rather than letting a
* potentially stale message through. Returns `false` only when no handler
* is registered (transport has no kernel to consult, so the verdict is
* effectively unknown — the receiver-side persisted check still gates
* correctness).
*
* @param peerId - The peer that completed the handshake.
* @param observedIncarnation - The incarnationId reported by the peer.
* @returns Whether the kernel detected a peer restart.
*/
async #remoteIncarnationChange(peerId: string): Promise<null> {
if (this.#remoteIncarnationChangeHandler) {
this.#remoteIncarnationChangeHandler(peerId);
async #remoteIncarnationChange(
peerId: string,
observedIncarnation: string,
): Promise<boolean> {
if (!this.#remoteIncarnationChangeHandler) {
return false;
}
try {
const verdict = await this.#remoteIncarnationChangeHandler(
peerId,
observedIncarnation,
);
if (typeof verdict !== 'boolean') {
this.#logger.error(
`incarnation handler returned non-boolean ${typeof verdict} for ${peerId.slice(0, 8)}; treating as restart`,
);
return true;
}
return verdict;
} catch (error) {
this.#logger.error(
`incarnation handler threw for ${peerId.slice(0, 8)}; treating as restart:`,
error,
);
return true;
}
return null;
}

/**
Expand Down Expand Up @@ -383,22 +416,34 @@ export class PlatformServicesClient implements PlatformServices {

this.#rpcClient.handleResponse(id, event.data);
} else if (isJsonRpcRequest(event.data)) {
const { id, method, params } = event.data;
try {
this.#rpcServer.assertHasMethod(method);
const result = await this.#rpcServer.execute(method, params);
await this.#sendMessage({
id,
result,
jsonrpc: '2.0',
});
} catch (error) {
await this.#sendMessage({
id,
error: serializeError(error),
jsonrpc: '2.0',
});
}
// Run the request handler in the background instead of awaiting it
// inside the drain. The drain processes responses too, and a request
// handler that fires an outbound RPC back to the other side would
// deadlock waiting for its response — the drain can't get to that
// response until the request handler returns.
this.#executeRequest(event.data).catch(() => undefined);
}
}

/**
* Execute a JSON-RPC request and write the response back. Errors during
* execution are serialized into a JSON-RPC error response; errors during
* response delivery are swallowed.
*
* @param request - The JSON-RPC request to execute.
*/
async #executeRequest(request: JsonRpcRequest): Promise<void> {
const { id, method, params } = request;
try {
this.#rpcServer.assertHasMethod(method);
const result = await this.#rpcServer.execute(method, params);
await this.#sendMessage({ id, result, jsonrpc: '2.0' });
} catch (error) {
await this.#sendMessage({
id,
error: serializeError(error),
jsonrpc: '2.0',
}).catch(() => undefined);
}
}
}
Expand Down
Loading
Loading