feat(scaling): serialize per-socket fan-out + NEW_CHANGES_BATCH#7881
feat(scaling): serialize per-socket fan-out + NEW_CHANGES_BATCH#7881deepshekhardas wants to merge 1 commit into
Conversation
Review Summary by Qodo(Agentic_describe updated until commit d239203)Implement NEW_CHANGES_BATCH wire format for scaled fan-out optimization
WalkthroughsDescription• Implement NEW_CHANGES_BATCH wire format to reduce packet count • Serialize per-socket fan-out with queued revision batching • Add client-side support for batched revision messages • Include configuration flag and comprehensive test coverage Diagramflowchart LR
A["Server collects<br/>queued revisions"] -->|"batchEnabled &&<br/>multiple revs"| B["NEW_CHANGES_BATCH<br/>single emit"]
A -->|"single rev or<br/>batching disabled"| C["NEW_CHANGES<br/>per revision"]
B -->|"client receives"| D["Apply all changes<br/>in order"]
C -->|"client receives"| D
E["Settings flag<br/>newChangesBatch"] -->|"controls"| A
File Changes1. src/node/handler/NewChangesPacker.ts
|
Code Review by Qodo
1. Premature rev claim
|
Code Review by Qodo
1. Lost revisions on race
|
| // Snapshot the local state so a concurrent updatePadClients() can't make | ||
| // us double-emit. We hold the "I'm responsible for revs (startRev, | ||
| // headRev]" claim by reading sessioninfo.rev once and overwriting it | ||
| // before any await. A second invocation arriving mid-loop will see the | ||
| // bumped rev and skip those revisions; if our emit fails the catch | ||
| // below rolls sessioninfo.rev back so they aren't lost. | ||
| const startRev = sessioninfo.rev; | ||
| const headRev = pad.getHeadRevisionNumber(); | ||
| if (startRev >= headRev) return; | ||
| const startTime = sessioninfo.time; | ||
| // Claim the range immediately so concurrent runs skip it. | ||
| sessioninfo.rev = headRev; | ||
|
|
There was a problem hiding this comment.
1. Lost revisions on race 🐞 Bug ≡ Correctness
updatePadClients() sets sessioninfo.rev=headRev before awaiting revision loads and emitting, so a concurrent updatePadClients() can advance sessioninfo.rev past headRev and prevent the rollback, permanently skipping some revisions for that socket. The client drops out-of-sequence NEW_CHANGES/NEW_CHANGES_BATCH (warn+return) and will not recover because the server believes the client is already at the newer rev.
Agent Prompt
## Issue description
`updatePadClients()` pre-claims `sessioninfo.rev` (`sessioninfo.rev = headRev`) before any `await` and before `socket.emit()`. If another `updatePadClients()` runs concurrently for the same socket (from code paths not serialized by `padChannels`), it can advance `sessioninfo.rev` beyond `headRev` while the first call is still in-flight. If the first call then throws (revision fetch or emit), the rollback guard `if (sessioninfo.rev === headRev)` prevents restoring `startRev`, leaving a permanent gap: the server will never resend the skipped revisions, and the client rejects subsequent revisions as out-of-sequence.
## Issue Context
Some call sites invoke `updatePadClients()` outside the per-pad `padChannels` queue (and at least one does not `await` it), so overlap is possible.
## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1023-1086]
### Suggested fix approach
- Introduce per-socket serialization for fan-out (similar to the existing `Channels` pattern), e.g. a `socketChannels.enqueue(socket.id, async () => { ... })` wrapper around the per-socket catch-up logic.
- With serialization in place, remove the early `sessioninfo.rev = headRev` claim.
- Only update `sessioninfo.rev`/`sessioninfo.time` after successful emits; on failure, leave them unchanged so the next run retries from the last confirmed sent revision.
- Ensure ordering guarantees: for a given socket, never emit revision N+1 before N has been successfully emitted (and ideally, treat the emit as “committed” only after the send loop completes without throwing).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| const startRev = sessioninfo.rev; | ||
| const headRev = pad.getHeadRevisionNumber(); | ||
| if (startRev >= headRev) return; | ||
| const startTime = sessioninfo.time; | ||
| // Claim the range immediately so concurrent runs skip it. | ||
| sessioninfo.rev = headRev; | ||
|
|
There was a problem hiding this comment.
1. Premature rev claim 🐞 Bug ≡ Correctness
updatePadClients() sets sessioninfo.rev=headRev before emitting any NEW_CHANGES/NEW_CHANGES_BATCH, so other server logic can believe a client is caught up when it is not. This can cause ACCEPT_COMMIT (or subsequent NEW_CHANGES) to arrive with newRev > clientRev+1, which the client treats as a bad revision and stops processing messages.
Agent Prompt
## Issue description
`updatePadClients()` preemptively assigns `sessioninfo.rev = headRev` to “claim” a range before doing any `await` or `socket.emit()`. This changes `sessioninfo.rev` semantics from “last revision actually delivered to this client” to “range I plan to deliver”, breaking existing ordering guarantees (notably the `handleUserChanges()` ACCEPT_COMMIT ordering assertion) and can lead to clients rejecting messages.
## Issue Context
The client enforces strict revision sequencing for both NEW_CHANGES and ACCEPT_COMMIT. The server currently relies on `thisSession.rev` to ensure all prior updates have been delivered before sending ACCEPT_COMMIT.
## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1028-1085]
### Suggested fix direction
- Do **not** mutate `sessioninfo.rev` until after the corresponding message(s) have been emitted successfully.
- If you need to prevent concurrent fan-out per socket, introduce a separate per-socket serialization mechanism (e.g., a per-socket promise chain/mutex or `sessioninfo._fanoutInProgress` + queued target rev) instead of reusing `sessioninfo.rev`.
- If partial sending is possible (non-batched path), track `lastSuccessfullyEmittedRev`/`lastSuccessfullyEmittedTime` and roll back to that on error (never to `startRev` after some messages may have been emitted).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
|
I don't want to discourage you but this is not okay. Please read your PRs |
Port of #7768
Rebased onto latest develop for mergeability.