From 2efe4765d423b5bbc67e2be1ed1a1c90d53ecf5c Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Tue, 20 Jan 2026 16:45:49 -0800 Subject: [PATCH 1/4] feat: Persist pending messages and sequence state in RemoteHandle Remove in-memory pendingMessages array in favor of deriving queue state from persisted startSeq/nextSendSeq values. Messages are stored as plain strings without sendTimestamp. Implements crash-safe persistence for remote message queues (issue #691). Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/RemoteHandle.test.ts | 181 ++++++++++++++++++ .../ocap-kernel/src/remotes/RemoteHandle.ts | 167 +++++++++++----- packages/ocap-kernel/src/store/index.test.ts | 8 + packages/ocap-kernel/src/store/index.ts | 4 + .../src/store/methods/remote.test.ts | 155 +++++++++++++++ .../ocap-kernel/src/store/methods/remote.ts | 147 ++++++++++++++ 6 files changed, 611 insertions(+), 51 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts index ce128bbfa..579f029b8 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts @@ -928,4 +928,185 @@ describe('RemoteHandle', () => { expect(JSON.parse(calls[2]![1]).seq).toBe(3); }); }); + + describe('message persistence', () => { + it('persists pending messages to storage on send', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + await remote.deliverNotify(resolutions); + + // Verify message was persisted (as a plain string) + const pendingMsgString = mockKernelStore.getPendingMessage( + mockRemoteId, + 1, + ); + expect(pendingMsgString).toBeDefined(); + expect(pendingMsgString).toContain('"seq":1'); + + // Verify seq state was persisted + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState).toStrictEqual({ + nextSendSeq: 1, + highestReceivedSeq: 0, + startSeq: 1, + }); + }); + + it('persists highestReceivedSeq when receiving messages', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + // Receive a message with seq=5 + await remote.handleRemoteMessage( + JSON.stringify({ + seq: 5, + method: 'deliver', + params: ['notify', resolutions], + }), + ); + + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.highestReceivedSeq).toBe(5); + }); + + it('deletes persisted messages when ACKed', async () => { + const remote = makeRemote(); + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + // Send two messages + await remote.deliverNotify(resolutions); + await remote.deliverNotify(resolutions); + + // Verify both are persisted + expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBeDefined(); + expect(mockKernelStore.getPendingMessage(mockRemoteId, 2)).toBeDefined(); + + // ACK the first message + await remote.handleRemoteMessage(JSON.stringify({ ack: 1 })); + + // First message should be deleted, second should remain + expect( + mockKernelStore.getPendingMessage(mockRemoteId, 1), + ).toBeUndefined(); + expect(mockKernelStore.getPendingMessage(mockRemoteId, 2)).toBeDefined(); + + // startSeq should be updated + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.startSeq).toBe(2); + }); + + it('restores pending messages on startup', async () => { + // Pre-populate storage with persisted state (messages stored as plain strings) + mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); + mockKernelStore.setRemoteHighestReceivedSeq(mockRemoteId, 2); + mockKernelStore.setRemoteStartSeq(mockRemoteId, 2); + mockKernelStore.setPendingMessage( + mockRemoteId, + 2, + '{"seq":2,"method":"deliver","params":["notify",[]]}', + ); + mockKernelStore.setPendingMessage( + mockRemoteId, + 3, + '{"seq":3,"method":"deliver","params":["notify",[]]}', + ); + + // Create a new RemoteHandle - should restore state + const remote = makeRemote(); + + // Send another message - should get seq 4 + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + + // Verify restore happened by checking the next seq number assigned + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]?.[1]; + expect(sentString).toBeDefined(); + const parsed = JSON.parse(sentString as string); + expect(parsed.seq).toBe(4); + expect(parsed.ack).toBe(2); // Should have restored highestReceivedSeq + }); + + it('repairs nextSendSeq from scanned messages on crash recovery', async () => { + // Simulate crash during enqueue: message written but nextSendSeq not updated + mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 2); + mockKernelStore.setRemoteStartSeq(mockRemoteId, 1); + // But messages 1, 2, and 3 exist (3 was written but seq not incremented) + mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}'); + mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}'); + + // Create RemoteHandle - should detect and repair + const remote = makeRemote(); + + // Next message should get seq 4 (repaired from scanning) + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]?.[1]; + expect(sentString).toBeDefined(); + const parsed = JSON.parse(sentString as string); + expect(parsed.seq).toBe(4); + + // Verify nextSendSeq was repaired in storage + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.nextSendSeq).toBe(4); // Updated after the new send + }); + + it('ignores orphan messages (seq < startSeq) on recovery', () => { + // Simulate crash during ACK: startSeq updated but message not deleted + mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); + mockKernelStore.setRemoteStartSeq(mockRemoteId, 2); + // Orphan message at seq 1 (already acked per startSeq=2) + mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + // Valid pending at seq 2 and 3 + mockKernelStore.setPendingMessage(mockRemoteId, 2, '{"seq":2}'); + mockKernelStore.setPendingMessage(mockRemoteId, 3, '{"seq":3}'); + + // Create RemoteHandle - orphan is ignored (lazy cleanup) + makeRemote(); + + // Orphan still exists in storage (cleaned lazily when remote is deleted) + expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBeDefined(); + // Valid pending messages should remain + expect(mockKernelStore.getPendingMessage(mockRemoteId, 2)).toBeDefined(); + expect(mockKernelStore.getPendingMessage(mockRemoteId, 3)).toBeDefined(); + }); + + it('handles fresh remote with no persisted state', async () => { + // No pre-populated storage - should start fresh + const remote = makeRemote(); + + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]?.[1]; + expect(sentString).toBeDefined(); + const parsed = JSON.parse(sentString as string); + expect(parsed.seq).toBe(1); // Fresh start + expect(parsed.ack).toBeUndefined(); // No highestReceivedSeq + }); + }); }); diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.ts index 338b6ec52..b3740078d 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.ts @@ -31,15 +31,6 @@ const MAX_RETRIES = 3; /** Maximum number of pending messages awaiting ACK. */ const MAX_PENDING_MESSAGES = 200; -/** - * Pending message awaiting acknowledgment. - */ -type PendingMessage = { - messageString: string; // Serialized message (with seq/ack) - sendTimestamp: number; // When first sent (for metrics) - retryCount: number; // 0 on first send, incremented on retry -}; - type RemoteHandleConstructorProps = { remoteId: RemoteId; peerId: string; @@ -134,12 +125,12 @@ export class RemoteHandle implements EndpointHandle { /** Highest sequence number received from remote (for piggyback ACK). */ #highestReceivedSeq: number = 0; - /** Queue of messages awaiting ACK, in sequence order. */ - readonly #pendingMessages: PendingMessage[] = []; - /** Sequence number of first message in pending queue. */ #startSeq: number = 0; + /** Retry count for pending messages (reset on ACK). */ + #retryCount: number = 0; + /** Timer handle for ACK timeout (retransmission). */ #ackTimeoutHandle: ReturnType | undefined; @@ -200,11 +191,65 @@ export class RemoteHandle implements EndpointHandle { */ static make(params: RemoteHandleConstructorProps): RemoteHandle { const remote = new RemoteHandle(params); + remote.#restorePersistedState(); return remote; } + /** + * Restore persisted state from storage on startup. + */ + #restorePersistedState(): void { + const seqState = this.#kernelStore.getRemoteSeqState(this.remoteId); + if (!seqState) { + // No persisted state - this is a fresh remote or pre-persistence remote + return; + } + + // Restore sequence state + this.#highestReceivedSeq = seqState.highestReceivedSeq; + this.#startSeq = seqState.startSeq; + this.#nextSendSeq = seqState.nextSendSeq; + + // Check for crash during enqueue: message written but nextSendSeq not updated + if ( + this.#kernelStore.getPendingMessage(this.remoteId, this.#nextSendSeq + 1) + ) { + this.#nextSendSeq += 1; + this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq); + } + + // If we have pending messages after recovery, start ACK timeout for retransmission + if (this.#hasPendingMessages()) { + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: restored ${this.#getPendingCount()} pending messages from persistence`, + ); + this.#startAckTimeout(); + } + } + // --- Sequence/ACK management methods --- + /** + * Check if there are pending messages awaiting ACK. + * + * @returns True if there are pending messages. + */ + #hasPendingMessages(): boolean { + return this.#nextSendSeq > 0 && this.#startSeq <= this.#nextSendSeq; + } + + /** + * Get the number of pending messages awaiting ACK. + * + * @returns The count of pending messages. + */ + #getPendingCount(): number { + if (!this.#hasPendingMessages()) { + return 0; + } + return this.#nextSendSeq - this.#startSeq + 1; + } + /** * Get the next sequence number and increment the counter. * @@ -226,19 +271,33 @@ export class RemoteHandle implements EndpointHandle { /** * Process an incoming ACK (cumulative - acknowledges all messages up to ackSeq). + * Uses crash-safe ordering: update startSeq first, then delete acked messages. * * @param ackSeq - The highest sequence number being acknowledged. */ #handleAck(ackSeq: number): void { - while (this.#startSeq <= ackSeq && this.#pendingMessages.length > 0) { - const pending = this.#pendingMessages.shift(); - if (pending) { - this.#logger.log( - `${this.#peerId.slice(0, 8)}:: message ${this.#startSeq} acknowledged (${Date.now() - pending.sendTimestamp}ms)`, - ); - } + const seqsToDelete: number[] = []; + const originalStartSeq = this.#startSeq; + + while (this.#startSeq <= ackSeq && this.#hasPendingMessages()) { + seqsToDelete.push(this.#startSeq); + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: message ${this.#startSeq} acknowledged`, + ); this.#startSeq += 1; } + + // Crash-safe dequeue: persist updated startSeq first, then delete messages + // On crash recovery, orphan entries (seq < startSeq) will be cleaned lazily + if (this.#startSeq !== originalStartSeq) { + this.#kernelStore.setRemoteStartSeq(this.remoteId, this.#startSeq); + for (const seq of seqsToDelete) { + this.#kernelStore.deletePendingMessage(this.remoteId, seq); + } + // Reset retry count when messages are acknowledged + this.#retryCount = 0; + } + // Restart or clear ACK timeout based on remaining pending messages this.#startAckTimeout(); } @@ -249,7 +308,7 @@ export class RemoteHandle implements EndpointHandle { */ #startAckTimeout(): void { this.#clearAckTimeout(); - if (this.#pendingMessages.length > 0) { + if (this.#hasPendingMessages()) { this.#ackTimeoutHandle = setTimeout(() => { this.#handleAckTimeout(); }, ACK_TIMEOUT_MS); @@ -271,15 +330,14 @@ export class RemoteHandle implements EndpointHandle { */ #handleAckTimeout(): void { this.#ackTimeoutHandle = undefined; - const head = this.#pendingMessages[0]; - if (!head) { + if (!this.#hasPendingMessages()) { return; } - if (head.retryCount >= MAX_RETRIES) { + if (this.#retryCount >= MAX_RETRIES) { // Give up - reject all pending messages, URL redemptions, and notify RemoteManager this.#logger.log( - `${this.#peerId.slice(0, 8)}:: gave up after ${MAX_RETRIES} retries, rejecting ${this.#pendingMessages.length} pending messages`, + `${this.#peerId.slice(0, 8)}:: gave up after ${MAX_RETRIES} retries, rejecting ${this.#getPendingCount()} pending messages`, ); this.#rejectAllPending(`not acknowledged after ${MAX_RETRIES} retries`); this.rejectPendingRedemptions( @@ -290,10 +348,9 @@ export class RemoteHandle implements EndpointHandle { } // Retransmit - head.retryCount += 1; - head.sendTimestamp = Date.now(); + this.#retryCount += 1; this.#logger.log( - `${this.#peerId.slice(0, 8)}:: retransmitting ${this.#pendingMessages.length} pending messages (attempt ${head.retryCount + 1})`, + `${this.#peerId.slice(0, 8)}:: retransmitting ${this.#getPendingCount()} pending messages (attempt ${this.#retryCount + 1})`, ); this.#retransmitPending(); } @@ -302,12 +359,18 @@ export class RemoteHandle implements EndpointHandle { * Retransmit all pending messages. */ #retransmitPending(): void { - for (const pending of this.#pendingMessages) { - this.#remoteComms - .sendRemoteMessage(this.#peerId, pending.messageString) - .catch((error) => { - this.#logger.error('Error retransmitting message:', error); - }); + for (let seq = this.#startSeq; seq <= this.#nextSendSeq; seq += 1) { + const messageString = this.#kernelStore.getPendingMessage( + this.remoteId, + seq, + ); + if (messageString) { + this.#remoteComms + .sendRemoteMessage(this.#peerId, messageString) + .catch((error) => { + this.#logger.error('Error retransmitting message:', error); + }); + } } this.#startAckTimeout(); } @@ -318,13 +381,16 @@ export class RemoteHandle implements EndpointHandle { * @param reason - The reason for failure. */ #rejectAllPending(reason: string): void { - for (let i = 0; i < this.#pendingMessages.length; i += 1) { + const pendingCount = this.#getPendingCount(); + for (let i = 0; i < pendingCount; i += 1) { this.#logger.warn( `Message ${this.#startSeq + i} delivery failed: ${reason}`, ); } - this.#pendingMessages.length = 0; - this.#startSeq = this.#nextSendSeq; + // Mark all as rejected by advancing startSeq past all pending messages + this.#startSeq = this.#nextSendSeq + 1; + this.#kernelStore.setRemoteStartSeq(this.remoteId, this.#startSeq); + this.#retryCount = 0; } /** @@ -404,12 +470,15 @@ export class RemoteHandle implements EndpointHandle { } // Check queue capacity before consuming any resources (seq number, ACK timer) - if (this.#pendingMessages.length >= MAX_PENDING_MESSAGES) { + if (this.#getPendingCount() >= MAX_PENDING_MESSAGES) { throw Error( `Message rejected: pending queue at capacity (${MAX_PENDING_MESSAGES})`, ); } + // Track whether this is the first pending message (before incrementing seq) + const wasEmpty = !this.#hasPendingMessages(); + // Build full message with seq and optional piggyback ack const seq = this.#getNextSeq(); const ack = this.#getAckValue(); @@ -422,21 +491,16 @@ export class RemoteHandle implements EndpointHandle { // Clear delayed ACK timer - we're piggybacking the ACK on this message this.#clearDelayedAck(); - // Track message for ACK - const pending: PendingMessage = { - messageString, - sendTimestamp: Date.now(), - retryCount: 0, - }; + // Crash-safe enqueue: persist message first, then update nextSendSeq + // On crash recovery, we can derive nextSendSeq by scanning pending messages + this.#kernelStore.setPendingMessage(this.remoteId, seq, messageString); + this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq); - // If queue was empty, set startSeq to this message's sequence number - if (this.#pendingMessages.length === 0) { + // If queue was empty, set startSeq to this message's sequence number and persist + if (wasEmpty) { this.#startSeq = seq; - } - this.#pendingMessages.push(pending); - - // Start ACK timeout if this is the first pending message - if (this.#pendingMessages.length === 1) { + this.#kernelStore.setRemoteStartSeq(this.remoteId, seq); + // Start ACK timeout for the first pending message this.#startAckTimeout(); } @@ -732,9 +796,10 @@ export class RemoteHandle implements EndpointHandle { const remoteCommand = parsed as RemoteCommand; const { seq, ack, method, params } = remoteCommand; - // Track received sequence number for piggyback ACK + // Track received sequence number for piggyback ACK and persist if (seq > this.#highestReceivedSeq) { this.#highestReceivedSeq = seq; + this.#kernelStore.setRemoteHighestReceivedSeq(this.remoteId, seq); } // Start delayed ACK timer - will send standalone ACK if no outgoing traffic diff --git a/packages/ocap-kernel/src/store/index.test.ts b/packages/ocap-kernel/src/store/index.test.ts index 9dbff2557..7e305f7cc 100644 --- a/packages/ocap-kernel/src/store/index.test.ts +++ b/packages/ocap-kernel/src/store/index.test.ts @@ -58,7 +58,9 @@ describe('kernel store', () => { 'deleteEndpoint', 'deleteKernelObject', 'deleteKernelPromise', + 'deletePendingMessage', 'deleteRemoteInfo', + 'deleteRemotePendingState', 'deleteSubcluster', 'deleteSubclusterVat', 'deleteVat', @@ -85,6 +87,7 @@ describe('kernel store', () => { 'getNextVatId', 'getObjectRefCount', 'getOwner', + 'getPendingMessage', 'getPinnedObjects', 'getPromisesByDecider', 'getQueueLength', @@ -92,6 +95,7 @@ describe('kernel store', () => { 'getReachableFlag', 'getRefCount', 'getRemoteInfo', + 'getRemoteSeqState', 'getRootObject', 'getSubcluster', 'getSubclusterVats', @@ -133,8 +137,12 @@ describe('kernel store', () => { 'scheduleReap', 'setGCActions', 'setObjectRefCount', + 'setPendingMessage', 'setPromiseDecider', + 'setRemoteHighestReceivedSeq', 'setRemoteInfo', + 'setRemoteNextSendSeq', + 'setRemoteStartSeq', 'setRevoked', 'setVatConfig', 'startCrank', diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index eec357d57..c84bfff7f 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -49,6 +49,10 @@ * * Remote bookkeeping * remote.${remoteId} = JSON(INFO) // remote's connection info + * remoteSeq.${remoteId}.nextSendSeq = NN // next outgoing sequence number + * remoteSeq.${remoteId}.highestReceivedSeq = NN // highest received seq (for ACK) + * remoteSeq.${remoteId}.startSeq = NN // seq of first pending message + * remotePending.${remoteId}.${seq} = JSON({messageString, sendTimestamp}) // pending message * * Kernel bookkeeping * initialized = true // if set, indicates the store has been initialized diff --git a/packages/ocap-kernel/src/store/methods/remote.test.ts b/packages/ocap-kernel/src/store/methods/remote.test.ts index b5ad90e9f..12f6df0ad 100644 --- a/packages/ocap-kernel/src/store/methods/remote.test.ts +++ b/packages/ocap-kernel/src/store/methods/remote.test.ts @@ -102,6 +102,7 @@ describe('remote store methods', () => { describe('deleteRemoteInfo', () => { it('removes remote info from storage', () => { mockKV.set(`remote.${remoteId1}`, JSON.stringify(remoteInfo1)); + mockGetPrefixedKeys.mockReturnValue([]); remoteMethods.deleteRemoteInfo(remoteId1); @@ -109,8 +110,33 @@ describe('remote store methods', () => { }); it('does nothing if remote info does not exist', () => { + mockGetPrefixedKeys.mockReturnValue([]); expect(() => remoteMethods.deleteRemoteInfo(remoteId1)).not.toThrow(); }); + + it('cleans up pending state when deleting remote info', () => { + mockKV.set(`remote.${remoteId1}`, JSON.stringify(remoteInfo1)); + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '5'); + mockKV.set(`remoteSeq.${remoteId1}.highestReceivedSeq`, '3'); + mockKV.set(`remoteSeq.${remoteId1}.startSeq`, '2'); + mockKV.set(`remotePending.${remoteId1}.2`, '{"seq":2}'); + mockKV.set(`remotePending.${remoteId1}.3`, '{"seq":3}'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.2`, + `remotePending.${remoteId1}.3`, + ]); + + remoteMethods.deleteRemoteInfo(remoteId1); + + expect(mockKV.has(`remote.${remoteId1}`)).toBe(false); + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + expect(mockKV.has(`remoteSeq.${remoteId1}.highestReceivedSeq`)).toBe( + false, + ); + expect(mockKV.has(`remoteSeq.${remoteId1}.startSeq`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); + }); }); describe('getAllRemoteRecords', () => { @@ -140,4 +166,133 @@ describe('remote store methods', () => { expect(records).toStrictEqual([]); }); }); + + describe('getRemoteSeqState', () => { + it('returns undefined when no state exists', () => { + const result = remoteMethods.getRemoteSeqState(remoteId1); + expect(result).toBeUndefined(); + }); + + it('returns sequence state when all values exist', () => { + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockKV.set(`remoteSeq.${remoteId1}.highestReceivedSeq`, '5'); + mockKV.set(`remoteSeq.${remoteId1}.startSeq`, '3'); + + const result = remoteMethods.getRemoteSeqState(remoteId1); + + expect(result).toStrictEqual({ + nextSendSeq: 10, + highestReceivedSeq: 5, + startSeq: 3, + }); + }); + + it('returns defaults for missing values when some state exists', () => { + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + + const result = remoteMethods.getRemoteSeqState(remoteId1); + + expect(result).toStrictEqual({ + nextSendSeq: 10, + highestReceivedSeq: 0, + startSeq: 0, + }); + }); + }); + + describe('setRemoteNextSendSeq', () => { + it('sets nextSendSeq', () => { + remoteMethods.setRemoteNextSendSeq(remoteId1, 42); + expect(mockKV.get(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe('42'); + }); + }); + + describe('setRemoteHighestReceivedSeq', () => { + it('sets highestReceivedSeq', () => { + remoteMethods.setRemoteHighestReceivedSeq(remoteId1, 15); + expect(mockKV.get(`remoteSeq.${remoteId1}.highestReceivedSeq`)).toBe( + '15', + ); + }); + }); + + describe('setRemoteStartSeq', () => { + it('sets startSeq', () => { + remoteMethods.setRemoteStartSeq(remoteId1, 7); + expect(mockKV.get(`remoteSeq.${remoteId1}.startSeq`)).toBe('7'); + }); + }); + + describe('getPendingMessage', () => { + it('returns undefined for non-existent message', () => { + const result = remoteMethods.getPendingMessage(remoteId1, 1); + expect(result).toBeUndefined(); + }); + + it('returns stored pending message string', () => { + const messageString = '{"seq":1,"method":"deliver"}'; + mockKV.set(`remotePending.${remoteId1}.1`, messageString); + + const result = remoteMethods.getPendingMessage(remoteId1, 1); + + expect(result).toBe(messageString); + }); + }); + + describe('setPendingMessage', () => { + it('stores pending message string', () => { + const messageString = '{"seq":5,"method":"deliver"}'; + + remoteMethods.setPendingMessage(remoteId1, 5, messageString); + + expect(mockKV.get(`remotePending.${remoteId1}.5`)).toBe(messageString); + }); + }); + + describe('deletePendingMessage', () => { + it('deletes pending message entry', () => { + mockKV.set(`remotePending.${remoteId1}.3`, '{"seq":3}'); + + remoteMethods.deletePendingMessage(remoteId1, 3); + + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); + }); + + it('does nothing if message does not exist', () => { + expect(() => + remoteMethods.deletePendingMessage(remoteId1, 99), + ).not.toThrow(); + }); + }); + + describe('deleteRemotePendingState', () => { + it('deletes all seq state and pending messages', () => { + mockKV.set(`remoteSeq.${remoteId1}.nextSendSeq`, '10'); + mockKV.set(`remoteSeq.${remoteId1}.highestReceivedSeq`, '5'); + mockKV.set(`remoteSeq.${remoteId1}.startSeq`, '2'); + mockKV.set(`remotePending.${remoteId1}.2`, '{"seq":2}'); + mockKV.set(`remotePending.${remoteId1}.3`, '{"seq":3}'); + mockGetPrefixedKeys.mockReturnValue([ + `remotePending.${remoteId1}.2`, + `remotePending.${remoteId1}.3`, + ]); + + remoteMethods.deleteRemotePendingState(remoteId1); + + expect(mockKV.has(`remoteSeq.${remoteId1}.nextSendSeq`)).toBe(false); + expect(mockKV.has(`remoteSeq.${remoteId1}.highestReceivedSeq`)).toBe( + false, + ); + expect(mockKV.has(`remoteSeq.${remoteId1}.startSeq`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.2`)).toBe(false); + expect(mockKV.has(`remotePending.${remoteId1}.3`)).toBe(false); + }); + + it('does nothing when no pending state exists', () => { + mockGetPrefixedKeys.mockReturnValue([]); + expect(() => + remoteMethods.deleteRemotePendingState(remoteId1), + ).not.toThrow(); + }); + }); }); diff --git a/packages/ocap-kernel/src/store/methods/remote.ts b/packages/ocap-kernel/src/store/methods/remote.ts index 1c093ba43..4511600a3 100644 --- a/packages/ocap-kernel/src/store/methods/remote.ts +++ b/packages/ocap-kernel/src/store/methods/remote.ts @@ -8,8 +8,19 @@ export type RemoteRecord = { remoteInfo: RemoteInfo; }; +/** + * Sequence tracking state for a remote. + */ +export type RemoteSeqState = { + nextSendSeq: number; + highestReceivedSeq: number; + startSeq: number; +}; + const REMOTE_INFO_BASE = 'remote.'; const REMOTE_INFO_BASE_LEN = REMOTE_INFO_BASE.length; +const REMOTE_SEQ_BASE = 'remoteSeq.'; +const REMOTE_PENDING_BASE = 'remotePending.'; /** * Get a kernel store object that provides functionality for managing remote records. @@ -65,6 +76,133 @@ export function getRemoteMethods(ctx: StoreContext) { */ function deleteRemoteInfo(remoteID: RemoteId): void { kv.delete(`${REMOTE_INFO_BASE}${remoteID}`); + deleteRemotePendingState(remoteID); + } + + // --- Sequence/ACK persistence methods --- + + /** + * Get the sequence tracking state for a remote. + * + * @param remoteId - The remote whose seq state is sought. + * @returns The seq state, or undefined if not yet persisted. + */ + function getRemoteSeqState(remoteId: RemoteId): RemoteSeqState | undefined { + const prefix = `${REMOTE_SEQ_BASE}${remoteId}.`; + const nextSendSeqStr = kv.get(`${prefix}nextSendSeq`); + const highestReceivedSeqStr = kv.get(`${prefix}highestReceivedSeq`); + const startSeqStr = kv.get(`${prefix}startSeq`); + + // If none of the keys exist, there's no persisted state + if ( + nextSendSeqStr === undefined && + highestReceivedSeqStr === undefined && + startSeqStr === undefined + ) { + return undefined; + } + + return { + nextSendSeq: nextSendSeqStr === undefined ? 0 : Number(nextSendSeqStr), + highestReceivedSeq: + highestReceivedSeqStr === undefined ? 0 : Number(highestReceivedSeqStr), + startSeq: startSeqStr === undefined ? 0 : Number(startSeqStr), + }; + } + + /** + * Set the next outgoing sequence number for a remote. + * + * @param remoteId - The remote whose state is to be updated. + * @param value - The value to set. + */ + function setRemoteNextSendSeq(remoteId: RemoteId, value: number): void { + kv.set(`${REMOTE_SEQ_BASE}${remoteId}.nextSendSeq`, String(value)); + } + + /** + * Set the highest received sequence number for a remote. + * + * @param remoteId - The remote whose state is to be updated. + * @param value - The value to set. + */ + function setRemoteHighestReceivedSeq( + remoteId: RemoteId, + value: number, + ): void { + kv.set(`${REMOTE_SEQ_BASE}${remoteId}.highestReceivedSeq`, String(value)); + } + + /** + * Set the start sequence number (first pending message) for a remote. + * + * @param remoteId - The remote whose state is to be updated. + * @param value - The value to set. + */ + function setRemoteStartSeq(remoteId: RemoteId, value: number): void { + kv.set(`${REMOTE_SEQ_BASE}${remoteId}.startSeq`, String(value)); + } + + /** + * Get a pending message by remote and sequence number. + * + * @param remoteId - The remote to get the message for. + * @param seq - The sequence number of the message. + * @returns The pending message string, or undefined if not found. + */ + function getPendingMessage( + remoteId: RemoteId, + seq: number, + ): string | undefined { + const key = `${REMOTE_PENDING_BASE}${remoteId}.${seq}`; + return kv.get(key); + } + + /** + * Store a pending message. + * + * @param remoteId - The remote to store the message for. + * @param seq - The sequence number of the message. + * @param messageString - The serialized message to store. + */ + function setPendingMessage( + remoteId: RemoteId, + seq: number, + messageString: string, + ): void { + const key = `${REMOTE_PENDING_BASE}${remoteId}.${seq}`; + kv.set(key, messageString); + } + + /** + * Delete a pending message entry. + * + * @param remoteId - The remote to delete the message for. + * @param seq - The sequence number of the message to delete. + */ + function deletePendingMessage(remoteId: RemoteId, seq: number): void { + const key = `${REMOTE_PENDING_BASE}${remoteId}.${seq}`; + kv.delete(key); + } + + /** + * Delete all pending state for a remote (seq state + all pending messages). + * Called when a remote relationship is terminated. + * + * @param remoteId - The remote whose pending state is to be deleted. + */ + function deleteRemotePendingState(remoteId: RemoteId): void { + // Delete seq state + const seqPrefix = `${REMOTE_SEQ_BASE}${remoteId}.`; + kv.delete(`${seqPrefix}nextSendSeq`); + kv.delete(`${seqPrefix}highestReceivedSeq`); + kv.delete(`${seqPrefix}startSeq`); + + // Delete all pending messages + const pendingPrefix = `${REMOTE_PENDING_BASE}${remoteId}.`; + for (const key of getPrefixedKeys(pendingPrefix)) { + kv.delete(key); + } } return { @@ -72,5 +210,14 @@ export function getRemoteMethods(ctx: StoreContext) { getRemoteInfo, setRemoteInfo, deleteRemoteInfo, + // Sequence/ACK persistence + getRemoteSeqState, + setRemoteNextSendSeq, + setRemoteHighestReceivedSeq, + setRemoteStartSeq, + getPendingMessage, + setPendingMessage, + deletePendingMessage, + deleteRemotePendingState, }; } From f6dbfc5cee45a4051860acec464c3ade78dc93c1 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Tue, 20 Jan 2026 19:57:43 -0800 Subject: [PATCH 2/4] fix: Persist startSeq before nextSendSeq for crash safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes crash-safe write ordering bug where startSeq was persisted after nextSendSeq. Now: message → startSeq (if first) → nextSendSeq. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/RemoteHandle.test.ts | 30 +++++++++++++++++++ .../ocap-kernel/src/remotes/RemoteHandle.ts | 15 ++++++---- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts index 579f029b8..3f76db49f 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts @@ -1071,6 +1071,36 @@ describe('RemoteHandle', () => { expect(seqState?.nextSendSeq).toBe(4); // Updated after the new send }); + it('repairs missing startSeq on crash recovery for first message', async () => { + // Simulate crash during first enqueue: message and startSeq written, but + // nextSendSeq not updated. This tests the crash-safe write ordering where + // startSeq is persisted before nextSendSeq. + mockKernelStore.setRemoteStartSeq(mockRemoteId, 1); + // nextSendSeq not written (defaults to 0) + mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + + // Create RemoteHandle - should detect message at nextSendSeq+1 and repair + const remote = makeRemote(); + + // Next message should get seq 2 (repaired nextSendSeq from 0 to 1, then +1) + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]?.[1]; + expect(sentString).toBeDefined(); + const parsed = JSON.parse(sentString as string); + expect(parsed.seq).toBe(2); + + // Verify state is correct: 2 pending messages (seq 1 and 2) + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.startSeq).toBe(1); + expect(seqState?.nextSendSeq).toBe(2); + }); + it('ignores orphan messages (seq < startSeq) on recovery', () => { // Simulate crash during ACK: startSeq updated but message not deleted mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.ts index b3740078d..7e88186e8 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.ts @@ -491,16 +491,21 @@ export class RemoteHandle implements EndpointHandle { // Clear delayed ACK timer - we're piggybacking the ACK on this message this.#clearDelayedAck(); - // Crash-safe enqueue: persist message first, then update nextSendSeq - // On crash recovery, we can derive nextSendSeq by scanning pending messages + // Crash-safe enqueue order: + // 1. Persist message first + // 2. If first message, persist startSeq (so recovery knows where queue begins) + // 3. Persist nextSendSeq last (recovery can repair this by scanning) this.#kernelStore.setPendingMessage(this.remoteId, seq, messageString); - this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq); - // If queue was empty, set startSeq to this message's sequence number and persist if (wasEmpty) { this.#startSeq = seq; this.#kernelStore.setRemoteStartSeq(this.remoteId, seq); - // Start ACK timeout for the first pending message + } + + this.#kernelStore.setRemoteNextSendSeq(this.remoteId, this.#nextSendSeq); + + // Start ACK timeout if this is the first pending message + if (wasEmpty) { this.#startAckTimeout(); } From 1d44443ff16fd963c2783e0eb36b0ef29926250a Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Tue, 20 Jan 2026 20:01:23 -0800 Subject: [PATCH 3/4] fix: Recover orphan message when no seq state exists Scan for message at seq 1 even when getRemoteSeqState returns undefined. Co-Authored-By: Claude Opus 4.5 --- .../src/remotes/RemoteHandle.test.ts | 33 +++++++++++++++++++ .../ocap-kernel/src/remotes/RemoteHandle.ts | 16 ++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts index 3f76db49f..be504a770 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.test.ts @@ -1101,6 +1101,39 @@ describe('RemoteHandle', () => { expect(seqState?.nextSendSeq).toBe(2); }); + it('recovers orphan message when no seq state exists', async () => { + // Simulate crash during first enqueue: message written but NO seq state + // persisted at all. This can happen if crash occurs after setPendingMessage + // but before setRemoteStartSeq. + mockKernelStore.setPendingMessage(mockRemoteId, 1, '{"seq":1}'); + // No seq state set - getRemoteSeqState will return undefined + + // Create RemoteHandle - should scan and find orphan message at seq 1 + const remote = makeRemote(); + + // Next message should get seq 2 (recovered seq 1, then +1) + const promiseRRef = 'rp+3'; + const resolutions: VatOneResolution[] = [ + [promiseRRef, false, { body: '"resolved value"', slots: [] }], + ]; + await remote.deliverNotify(resolutions); + + const sentString = vi.mocked(mockRemoteComms.sendRemoteMessage).mock + .calls[0]?.[1]; + expect(sentString).toBeDefined(); + const parsed = JSON.parse(sentString as string); + expect(parsed.seq).toBe(2); + + // Verify state is correct: seq state recovered and 2 pending messages + const seqState = mockKernelStore.getRemoteSeqState(mockRemoteId); + expect(seqState?.startSeq).toBe(1); + expect(seqState?.nextSendSeq).toBe(2); + // Original orphan message still exists + expect(mockKernelStore.getPendingMessage(mockRemoteId, 1)).toBe( + '{"seq":1}', + ); + }); + it('ignores orphan messages (seq < startSeq) on recovery', () => { // Simulate crash during ACK: startSeq updated but message not deleted mockKernelStore.setRemoteNextSendSeq(mockRemoteId, 3); diff --git a/packages/ocap-kernel/src/remotes/RemoteHandle.ts b/packages/ocap-kernel/src/remotes/RemoteHandle.ts index 7e88186e8..a831fb2b6 100644 --- a/packages/ocap-kernel/src/remotes/RemoteHandle.ts +++ b/packages/ocap-kernel/src/remotes/RemoteHandle.ts @@ -200,8 +200,22 @@ export class RemoteHandle implements EndpointHandle { */ #restorePersistedState(): void { const seqState = this.#kernelStore.getRemoteSeqState(this.remoteId); + if (!seqState) { - // No persisted state - this is a fresh remote or pre-persistence remote + // No persisted seq state. Check for crash during first message enqueue: + // Message may have been written but no seq state persisted yet. + // First message always has seq 1 (since #nextSendSeq starts at 0, +1 = 1) + if (this.#kernelStore.getPendingMessage(this.remoteId, 1)) { + // Found orphan message - recover by setting up state + this.#startSeq = 1; + this.#nextSendSeq = 1; + this.#kernelStore.setRemoteStartSeq(this.remoteId, 1); + this.#kernelStore.setRemoteNextSendSeq(this.remoteId, 1); + this.#logger.log( + `${this.#peerId.slice(0, 8)}:: recovered orphan message at seq 1 from crash during first enqueue`, + ); + this.#startAckTimeout(); + } return; } From c2a6d68fe9ce97d0eb5ce0a025eb472c9e678f49 Mon Sep 17 00:00:00 2001 From: Chip Morningstar Date: Wed, 21 Jan 2026 09:19:16 -0800 Subject: [PATCH 4/4] copy editing and documentation --- docs/ken-protocol-assessment.md | 192 ++++++++++++++++++++++++ packages/ocap-kernel/src/store/index.ts | 2 +- 2 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 docs/ken-protocol-assessment.md diff --git a/docs/ken-protocol-assessment.md b/docs/ken-protocol-assessment.md new file mode 100644 index 000000000..371721c61 --- /dev/null +++ b/docs/ken-protocol-assessment.md @@ -0,0 +1,192 @@ +# Ken Protocol Assessment + +This document assesses our current remote messaging system against the ideals of the Ken protocol, as described in HP Labs Tech Report HPL-2010-155: "Output-Valid Rollback-Recovery" by Kelly, Karp, Stiegler, Close, and Cho. + +## Ken Protocol Key Properties + +The Ken protocol provides these guarantees for fault-tolerant distributed computing: + +1. **Exactly-once delivery** in process-pairwise FIFO order +2. **Output validity**: Outputs could have resulted from failure-free execution +3. **Transactional turns**: One message delivered → processing → checkpoint → transmit outputs +4. **Consistent frontier**: Most-recent per-process checkpoints always form a recovery line +5. **Local recovery**: Crashes cause only local rollbacks, no domino effect +6. **Sender-based message logging**: Messages persisted in sender's output queue until ACKed +7. **Deferred transmission**: Outputs buffered during turn, transmitted only after checkpoint + +### Ken's Turn Model + +A "turn" in Ken is the fundamental unit of computation: + +``` +turn_start(deliver exactly one message to processing_function) + → processing_function executes + → outputs buffered in Q_out (not transmitted yet) +turn_end: + → atomically persist(turn, app_state, Q_out, Done) + → THEN transmit buffered messages +``` + +Key aspects: +- Only one message delivered per turn +- All outputs buffered until end of turn +- Atomic checkpoint includes application state AND output queue +- Transmission happens only after checkpoint completes +- `Done` table tracks which messages have been processed to completion + +## Assessment of Our Current System + +### What We Have (Aligned with Ken) + +| Property | Status | Implementation | +|----------|--------|----------------| +| Sender-based logging | ✓ | Messages persisted at `remotePending.${remoteId}.${seq}` | +| Sequence numbers | ✓ | `seq` on outgoing, `highestReceivedSeq` for incoming | +| Cumulative ACK | ✓ | Piggyback ACKs acknowledge all messages up to seq | +| Retransmission | ✓ | Timeout-based retransmit until ACK or max retries | +| Crash-safe persistence | ✓ | Write message first, then update nextSendSeq | +| Local recovery | ✓ | Restore seq state, restart ACK timeout | + +### What We're Missing or Differs + +#### 1. Transactional Turns (Major Gap) + +**Ken's model:** +``` +turn_start(deliver one message) + → processing_function executes + → outputs buffered in Q_out +turn_end: + → atomically persist(turn, app_state, Q_out, Done) + → THEN transmit buffered messages +``` + +**Our model:** +``` +message received → kernel processes → sends outputs immediately +each output: persist message → update seq → transmit +``` + +The kernel's "crank" mechanism may provide turn-like boundaries, but `RemoteHandle` doesn't coordinate with it. Messages are transmitted immediately after being persisted, not deferred until end of turn. + +#### 2. Done Table / Duplicate Detection (Gap) + +Ken maintains a `Done` table ensuring: +- Each message delivered to application **at most once** +- FIFO ordering enforced via `next_ready()` considering seq + sender ID + +We track `highestReceivedSeq` but only for ACK purposes. We don't have explicit duplicate detection for incoming messages. If the remote retransmits a message we already processed (but before we ACKed), we could deliver it twice. + +#### 3. Output Validity (Partial) + +Ken guarantees outputs could have resulted from failure-free execution because: +- Outputs are buffered during a turn +- A crash during processing loses all outputs from that turn +- Only committed outputs escape to the outside world + +Our system transmits immediately after persisting, so a crash mid-crank could result in: +- Some messages transmitted to remote +- But kernel state not yet committed +- On recovery, kernel re-executes and sends different/duplicate messages + +#### 4. Atomic Checkpoint (Gap) + +Ken atomically checkpoints `(turn, app_state, Q_out, Done)` together at end of turn. + +Our system persists messages individually as sent. There's no atomic boundary coordinating kernel state with outgoing message state. + +#### 5. Deferred Transmission (Gap) + +**Ken:** `persist checkpoint → THEN transmit` + +**Ours:** `persist message → transmit immediately` + +Ken's approach ensures the "send" is recorded in checkpoint before any transmission. This is crucial for the consistent frontier property. + +#### 6. Input Queue Handling (Gap) + +Ken can opportunistically persist incoming messages before delivery. On crash, the input queue is reconstructed from sender retransmissions. + +We don't persist incoming messages. On crash, we rely entirely on senders to retransmit. + +### Summary Table + +| Ken Property | Our System | Notes | +|--------------|------------|-------| +| Exactly-once delivery | **Partial** | At-least-once with no duplicate detection | +| Output validity | **No** | Immediate transmission, no turn boundaries | +| Transactional turns | **No** | No coordination with kernel cranks | +| Consistent frontier | **Partial** | No atomic checkpoint across kernel+remote state | +| Local recovery | **Yes** | Crashes don't affect other processes | +| Sender-based logging | **Yes** | Messages persisted until ACKed | +| FIFO ordering | **Partial** | Per-sender seq, but no enforcement on receive side | + +## What Would Be Needed to Achieve Ken Properties + +### 1. Coordinate with Kernel Crank Boundaries + +Buffer outgoing messages during crank execution, persist and transmit only at crank commit. This would require: +- `RemoteHandle` to be aware of crank boundaries +- Outgoing messages buffered in memory during crank +- Batch persist + transmit at crank commit + +### 2. Add Done Table + +Track processed message IDs, deduplicate on receive: +- Persist `Done` table entries for processed messages +- On receive, check if message already in `Done` before delivering +- ACK messages in `Done` without re-delivering + +### 3. Atomic Checkpoint + +Persist kernel state and output queue together: +- Single atomic write at end of crank +- Include: kernel state, outgoing messages, Done table updates +- Requires coordination between kernel store and remote message persistence + +### 4. Defer Transmission + +Transmit only after checkpoint completes: +- Buffer messages during turn +- After atomic checkpoint succeeds, release messages for transmission +- This ensures "send" is recorded before any transmission occurs + +### 5. FIFO Enforcement on Receive + +Hold out-of-order messages until predecessors processed: +- Track expected next seq per sender +- Buffer messages that arrive out of order +- Deliver in sequence order only + +## Architectural Implications + +The most significant change would be integrating `RemoteHandle` with the kernel's crank/commit cycle. Currently: + +``` +Kernel Crank: + process message → syscalls may send to remote + +RemoteHandle (independent): + persist each outgoing message → transmit immediately +``` + +Ken-style architecture: + +``` +Kernel Crank: + process message → syscalls buffer outputs + +Crank Commit (atomic): + persist(kernel_state, buffered_outputs, done_table) + +Post-Commit: + transmit buffered outputs +``` + +This would require the kernel to control when `RemoteHandle` actually transmits, rather than `RemoteHandle` transmitting independently. + +## References + +- HP Labs Tech Report HPL-2010-155: "Output-Valid Rollback-Recovery" +- Ken project: https://web.eecs.umich.edu/~tpkelly/Ken/ +- Waterken (Ken implementation in Java): http://waterken.sourceforge.net/ diff --git a/packages/ocap-kernel/src/store/index.ts b/packages/ocap-kernel/src/store/index.ts index c84bfff7f..f058e9d89 100644 --- a/packages/ocap-kernel/src/store/index.ts +++ b/packages/ocap-kernel/src/store/index.ts @@ -52,7 +52,7 @@ * remoteSeq.${remoteId}.nextSendSeq = NN // next outgoing sequence number * remoteSeq.${remoteId}.highestReceivedSeq = NN // highest received seq (for ACK) * remoteSeq.${remoteId}.startSeq = NN // seq of first pending message - * remotePending.${remoteId}.${seq} = JSON({messageString, sendTimestamp}) // pending message + * remotePending.${remoteId}.${seq} = ${messageString} // pending message * * Kernel bookkeeping * initialized = true // if set, indicates the store has been initialized