Skip to content

feat(scaling): serialize per-socket fan-out + NEW_CHANGES_BATCH#7881

Closed
deepshekhardas wants to merge 1 commit into
ether:developfrom
deepshekhardas:fix/pr-7768-socket-fanout-batch
Closed

feat(scaling): serialize per-socket fan-out + NEW_CHANGES_BATCH#7881
deepshekhardas wants to merge 1 commit into
ether:developfrom
deepshekhardas:fix/pr-7768-socket-fanout-batch

Conversation

@deepshekhardas
Copy link
Copy Markdown

Port of #7768

Rebased onto latest develop for mergeability.

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Jun 2, 2026

Review Summary by Qodo

(Agentic_describe updated until commit d239203)

Implement NEW_CHANGES_BATCH wire format for scaled fan-out optimization

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Implement NEW_CHANGES_BATCH wire format to reduce packet count
• Serialize per-socket fan-out with queued revision batching
• Add client-side support for batched revision messages
• Include configuration flag and comprehensive test coverage
Diagram
flowchart LR
  A["Server collects<br/>queued revisions"] -->|"batchEnabled &&<br/>multiple revs"| B["NEW_CHANGES_BATCH<br/>single emit"]
  A -->|"single rev or<br/>batching disabled"| C["NEW_CHANGES<br/>per revision"]
  B -->|"client receives"| D["Apply all changes<br/>in order"]
  C -->|"client receives"| D
  E["Settings flag<br/>newChangesBatch"] -->|"controls"| A

Loading

Grey Divider

File Changes

1. src/node/handler/NewChangesPacker.ts ✨ Enhancement +41/-0

New module for batching decision logic

src/node/handler/NewChangesPacker.ts


2. src/node/handler/PadMessageHandler.ts ✨ Enhancement +59/-27

Refactor updatePadClients to batch revisions

src/node/handler/PadMessageHandler.ts


3. src/node/prom-instruments.ts ✨ Enhancement +1/-0

Add NEW_CHANGES_BATCH to metrics allowlist

src/node/prom-instruments.ts


View more (6)
4. src/node/utils/Settings.ts ⚙️ Configuration changes +16/-0

Add newChangesBatch configuration setting

src/node/utils/Settings.ts


5. src/static/js/broadcast.ts ✨ Enhancement +17/-11

Handle NEW_CHANGES_BATCH message type

src/static/js/broadcast.ts


6. src/static/js/collab_client.ts ✨ Enhancement +15/-8

Process batched revisions in order

src/static/js/collab_client.ts


7. src/static/js/types/SocketIOMessage.ts ✨ Enhancement +14/-0

Add NEW_CHANGES_BATCH type definitions

src/static/js/types/SocketIOMessage.ts


8. src/tests/backend-new/specs/new-changes-batch.test.ts 🧪 Tests +48/-0

Add regression tests for batching logic

src/tests/backend-new/specs/new-changes-batch.test.ts


9. settings.json.template 📝 Documentation +12/-0

Document newChangesBatch configuration option

settings.json.template


Grey Divider

Qodo Logo

@qodo-free-for-open-source-projects
Copy link
Copy Markdown

qodo-free-for-open-source-projects Bot commented Jun 2, 2026

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (0)

Grey Divider


Action required

1. Premature rev claim 🐞 Bug ≡ Correctness ⭐ New
Description
updatePadClients() sets sessioninfo.rev=headRev before emitting any NEW_CHANGES/NEW_CHANGES_BATCH,
so other server logic can believe a client is caught up when it is not. This can cause ACCEPT_COMMIT
(or subsequent NEW_CHANGES) to arrive with newRev > clientRev+1, which the client treats as a bad
revision and stops processing messages.
Code

src/node/handler/PadMessageHandler.ts[R1034-1040]

Evidence
The server now bumps sessioninfo.rev to headRev before any awaits/emits, but handleUserChanges uses
thisSession.rev to assert ordering before sending ACCEPT_COMMIT; the client rejects ACCEPT_COMMIT
and NEW_CHANGES if the revision is not exactly rev or rev+1. This combination means a client that is
actually behind can receive ACCEPT_COMMIT (or further updates) with an unexpected revision number
and stop processing messages.

src/node/handler/PadMessageHandler.ts[1028-1040]
src/node/handler/PadMessageHandler.ts[1051-1085]
src/node/handler/PadMessageHandler.ts[977-987]
src/static/js/collab_client.ts[191-230]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`updatePadClients()` preemptively assigns `sessioninfo.rev = headRev` to “claim” a range before doing any `await` or `socket.emit()`. This changes `sessioninfo.rev` semantics from “last revision actually delivered to this client” to “range I plan to deliver”, breaking existing ordering guarantees (notably the `handleUserChanges()` ACCEPT_COMMIT ordering assertion) and can lead to clients rejecting messages.

## Issue Context
The client enforces strict revision sequencing for both NEW_CHANGES and ACCEPT_COMMIT. The server currently relies on `thisSession.rev` to ensure all prior updates have been delivered before sending ACCEPT_COMMIT.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1028-1085]

### Suggested fix direction
- Do **not** mutate `sessioninfo.rev` until after the corresponding message(s) have been emitted successfully.
- If you need to prevent concurrent fan-out per socket, introduce a separate per-socket serialization mechanism (e.g., a per-socket promise chain/mutex or `sessioninfo._fanoutInProgress` + queued target rev) instead of reusing `sessioninfo.rev`.
- If partial sending is possible (non-batched path), track `lastSuccessfullyEmittedRev`/`lastSuccessfullyEmittedTime` and roll back to that on error (never to `startRev` after some messages may have been emitted).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Lost revisions on race 🐞 Bug ≡ Correctness
Description
updatePadClients() sets sessioninfo.rev=headRev before awaiting revision loads and emitting, so a
concurrent updatePadClients() can advance sessioninfo.rev past headRev and prevent the rollback,
permanently skipping some revisions for that socket. The client drops out-of-sequence
NEW_CHANGES/NEW_CHANGES_BATCH (warn+return) and will not recover because the server believes the
client is already at the newer rev.
Code

src/node/handler/PadMessageHandler.ts[R1028-1040]

Evidence
The server advances sessioninfo.rev before any awaits/emits and only rolls it back if it still
equals the originally claimed headRev, which fails if another concurrent invocation has advanced
it. Concurrent invocation is plausible because some call sites do not await updatePadClients().
The client explicitly returns (drops processing) on revision mismatch, so skipped/out-of-order
revisions can stall a session permanently.

src/node/handler/PadMessageHandler.ts[1019-1085]
src/node/db/API.ts[315-333]
src/node/handler/PadMessageHandler.ts[169-209]
src/static/js/collab_client.ts[191-215]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`updatePadClients()` pre-claims `sessioninfo.rev` (`sessioninfo.rev = headRev`) before any `await` and before `socket.emit()`. If another `updatePadClients()` runs concurrently for the same socket (from code paths not serialized by `padChannels`), it can advance `sessioninfo.rev` beyond `headRev` while the first call is still in-flight. If the first call then throws (revision fetch or emit), the rollback guard `if (sessioninfo.rev === headRev)` prevents restoring `startRev`, leaving a permanent gap: the server will never resend the skipped revisions, and the client rejects subsequent revisions as out-of-sequence.
## Issue Context
Some call sites invoke `updatePadClients()` outside the per-pad `padChannels` queue (and at least one does not `await` it), so overlap is possible.
## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1023-1086]
### Suggested fix approach
- Introduce per-socket serialization for fan-out (similar to the existing `Channels` pattern), e.g. a `socketChannels.enqueue(socket.id, async () => { ... })` wrapper around the per-socket catch-up logic.
- With serialization in place, remove the early `sessioninfo.rev = headRev` claim.
- Only update `sessioninfo.rev`/`sessioninfo.time` after successful emits; on failure, leave them unchanged so the next run retries from the last confirmed sent revision.
- Ensure ordering guarantees: for a given socket, never emit revision N+1 before N has been successfully emitted (and ideally, treat the emit as “committed” only after the send loop completes without throwing).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Batch hook payload missing 🐞 Bug ⚙ Maintainability ⭐ New
Description
collab_client now processes NEW_CHANGES_BATCH, but it only populates msg.payload for NEW_CHANGES, so
NEW_CHANGES_BATCH triggers handleClientMessage_NEW_CHANGES_BATCH with payload undefined and does not
trigger existing handleClientMessage_NEW_CHANGES hooks. Enabling batching can silently break client
plugins that depend on NEW_CHANGES hooks/payloads.
Code

src/static/js/collab_client.ts[R191-196]

Evidence
The code now accepts NEW_CHANGES_BATCH in the main message handler, but the later hook payload
normalization only handles NEW_CHANGES. Therefore hooks for NEW_CHANGES_BATCH will receive undefined
payload, and existing NEW_CHANGES hooks will never run for batched updates.

src/static/js/collab_client.ts[191-197]
src/static/js/collab_client.ts[302-313]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`collab_client.ts` added support for `NEW_CHANGES_BATCH`, but the hook compatibility shim that sets `msg.payload` is still only applied to `NEW_CHANGES`. As a result, `hooks.callAll('handleClientMessage_NEW_CHANGES_BATCH', {payload: undefined})` is invoked, and existing plugins listening to `handleClientMessage_NEW_CHANGES` won’t see updates when batching is enabled.

## Issue Context
Plugins often rely on the `handleClientMessage_${type}` hook pattern and expect `payload` to be present for message types.

## Fix Focus Areas
- src/static/js/collab_client.ts[191-216]
- src/static/js/collab_client.ts[302-313]

### Suggested fix direction
- Extend the payload shim to include `NEW_CHANGES_BATCH` (set `msg.payload = msg`), and/or
- For backwards compatibility, when receiving `NEW_CHANGES_BATCH`, additionally invoke `handleClientMessage_NEW_CHANGES` for each contained change (or emit a synthetic `NEW_CHANGES`-shaped payload) so existing plugins continue to work.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Unused type import 🐞 Bug ⚙ Maintainability
Description
PadMessageHandler imports the NewChangesItem type but never uses it, which is likely to fail linting
and adds maintenance noise. This should be fixed by typing the pending array as NewChangesItem[] (or
removing the import).
Code

src/node/handler/PadMessageHandler.ts[R48-52]

Evidence
The file imports NewChangesItem but defines pending with an inline type and never references
NewChangesItem elsewhere.

src/node/handler/PadMessageHandler.ts[34-56]
src/node/handler/PadMessageHandler.ts[1041-1049]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`PadMessageHandler.ts` imports `type NewChangesItem` but the code uses an inline object type for `pending` instead, leaving `NewChangesItem` unused.
## Issue Context
This is likely to trip `no-unused-vars` / unused-import lint rules and makes the code harder to keep consistent with `NewChangesPacker`.
## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[48-56]
- src/node/handler/PadMessageHandler.ts[1041-1049]
## Recommended change
- Prefer: `const pending: NewChangesItem[] = [];` and delete the inline type.
- Alternatively: remove `type NewChangesItem` from the import if you want to keep the inline type.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Jun 2, 2026

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (0)

Grey Divider


Action required

1. Lost revisions on race 🐞 Bug ≡ Correctness
Description
updatePadClients() sets sessioninfo.rev=headRev before awaiting revision loads and emitting, so a
concurrent updatePadClients() can advance sessioninfo.rev past headRev and prevent the rollback,
permanently skipping some revisions for that socket. The client drops out-of-sequence
NEW_CHANGES/NEW_CHANGES_BATCH (warn+return) and will not recover because the server believes the
client is already at the newer rev.
Code

src/node/handler/PadMessageHandler.ts[R1028-1040]

Evidence
The server advances sessioninfo.rev before any awaits/emits and only rolls it back if it still
equals the originally claimed headRev, which fails if another concurrent invocation has advanced
it. Concurrent invocation is plausible because some call sites do not await updatePadClients().
The client explicitly returns (drops processing) on revision mismatch, so skipped/out-of-order
revisions can stall a session permanently.

src/node/handler/PadMessageHandler.ts[1019-1085]
src/node/db/API.ts[315-333]
src/node/handler/PadMessageHandler.ts[169-209]
src/static/js/collab_client.ts[191-215]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`updatePadClients()` pre-claims `sessioninfo.rev` (`sessioninfo.rev = headRev`) before any `await` and before `socket.emit()`. If another `updatePadClients()` runs concurrently for the same socket (from code paths not serialized by `padChannels`), it can advance `sessioninfo.rev` beyond `headRev` while the first call is still in-flight. If the first call then throws (revision fetch or emit), the rollback guard `if (sessioninfo.rev === headRev)` prevents restoring `startRev`, leaving a permanent gap: the server will never resend the skipped revisions, and the client rejects subsequent revisions as out-of-sequence.

## Issue Context
Some call sites invoke `updatePadClients()` outside the per-pad `padChannels` queue (and at least one does not `await` it), so overlap is possible.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1023-1086]

### Suggested fix approach
- Introduce per-socket serialization for fan-out (similar to the existing `Channels` pattern), e.g. a `socketChannels.enqueue(socket.id, async () => { ... })` wrapper around the per-socket catch-up logic.
- With serialization in place, remove the early `sessioninfo.rev = headRev` claim.
- Only update `sessioninfo.rev`/`sessioninfo.time` after successful emits; on failure, leave them unchanged so the next run retries from the last confirmed sent revision.
- Ensure ordering guarantees: for a given socket, never emit revision N+1 before N has been successfully emitted (and ideally, treat the emit as “committed” only after the send loop completes without throwing).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

2. Unused type import 🐞 Bug ⚙ Maintainability
Description
PadMessageHandler imports the NewChangesItem type but never uses it, which is likely to fail linting
and adds maintenance noise. This should be fixed by typing the pending array as NewChangesItem[] (or
removing the import).
Code

src/node/handler/PadMessageHandler.ts[R48-52]

Evidence
The file imports NewChangesItem but defines pending with an inline type and never references
NewChangesItem elsewhere.

src/node/handler/PadMessageHandler.ts[34-56]
src/node/handler/PadMessageHandler.ts[1041-1049]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`PadMessageHandler.ts` imports `type NewChangesItem` but the code uses an inline object type for `pending` instead, leaving `NewChangesItem` unused.

## Issue Context
This is likely to trip `no-unused-vars` / unused-import lint rules and makes the code harder to keep consistent with `NewChangesPacker`.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[48-56]
- src/node/handler/PadMessageHandler.ts[1041-1049]

## Recommended change
- Prefer: `const pending: NewChangesItem[] = [];` and delete the inline type.
- Alternatively: remove `type NewChangesItem` from the import if you want to keep the inline type.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

Comment on lines +1028 to +1040
// Snapshot the local state so a concurrent updatePadClients() can't make
// us double-emit. We hold the "I'm responsible for revs (startRev,
// headRev]" claim by reading sessioninfo.rev once and overwriting it
// before any await. A second invocation arriving mid-loop will see the
// bumped rev and skip those revisions; if our emit fails the catch
// below rolls sessioninfo.rev back so they aren't lost.
const startRev = sessioninfo.rev;
const headRev = pad.getHeadRevisionNumber();
if (startRev >= headRev) return;
const startTime = sessioninfo.time;
// Claim the range immediately so concurrent runs skip it.
sessioninfo.rev = headRev;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Lost revisions on race 🐞 Bug ≡ Correctness

updatePadClients() sets sessioninfo.rev=headRev before awaiting revision loads and emitting, so a
concurrent updatePadClients() can advance sessioninfo.rev past headRev and prevent the rollback,
permanently skipping some revisions for that socket. The client drops out-of-sequence
NEW_CHANGES/NEW_CHANGES_BATCH (warn+return) and will not recover because the server believes the
client is already at the newer rev.
Agent Prompt
## Issue description
`updatePadClients()` pre-claims `sessioninfo.rev` (`sessioninfo.rev = headRev`) before any `await` and before `socket.emit()`. If another `updatePadClients()` runs concurrently for the same socket (from code paths not serialized by `padChannels`), it can advance `sessioninfo.rev` beyond `headRev` while the first call is still in-flight. If the first call then throws (revision fetch or emit), the rollback guard `if (sessioninfo.rev === headRev)` prevents restoring `startRev`, leaving a permanent gap: the server will never resend the skipped revisions, and the client rejects subsequent revisions as out-of-sequence.

## Issue Context
Some call sites invoke `updatePadClients()` outside the per-pad `padChannels` queue (and at least one does not `await` it), so overlap is possible.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1023-1086]

### Suggested fix approach
- Introduce per-socket serialization for fan-out (similar to the existing `Channels` pattern), e.g. a `socketChannels.enqueue(socket.id, async () => { ... })` wrapper around the per-socket catch-up logic.
- With serialization in place, remove the early `sessioninfo.rev = headRev` claim.
- Only update `sessioninfo.rev`/`sessioninfo.time` after successful emits; on failure, leave them unchanged so the next run retries from the last confirmed sent revision.
- Ensure ordering guarantees: for a given socket, never emit revision N+1 before N has been successfully emitted (and ideally, treat the emit as “committed” only after the send loop completes without throwing).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +1034 to +1040
const startRev = sessioninfo.rev;
const headRev = pad.getHeadRevisionNumber();
if (startRev >= headRev) return;
const startTime = sessioninfo.time;
// Claim the range immediately so concurrent runs skip it.
sessioninfo.rev = headRev;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Premature rev claim 🐞 Bug ≡ Correctness

updatePadClients() sets sessioninfo.rev=headRev before emitting any NEW_CHANGES/NEW_CHANGES_BATCH,
so other server logic can believe a client is caught up when it is not. This can cause ACCEPT_COMMIT
(or subsequent NEW_CHANGES) to arrive with newRev > clientRev+1, which the client treats as a bad
revision and stops processing messages.
Agent Prompt
## Issue description
`updatePadClients()` preemptively assigns `sessioninfo.rev = headRev` to “claim” a range before doing any `await` or `socket.emit()`. This changes `sessioninfo.rev` semantics from “last revision actually delivered to this client” to “range I plan to deliver”, breaking existing ordering guarantees (notably the `handleUserChanges()` ACCEPT_COMMIT ordering assertion) and can lead to clients rejecting messages.

## Issue Context
The client enforces strict revision sequencing for both NEW_CHANGES and ACCEPT_COMMIT. The server currently relies on `thisSession.rev` to ensure all prior updates have been delivered before sending ACCEPT_COMMIT.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1028-1085]

### Suggested fix direction
- Do **not** mutate `sessioninfo.rev` until after the corresponding message(s) have been emitted successfully.
- If you need to prevent concurrent fan-out per socket, introduce a separate per-socket serialization mechanism (e.g., a per-socket promise chain/mutex or `sessioninfo._fanoutInProgress` + queued target rev) instead of reusing `sessioninfo.rev`.
- If partial sending is possible (non-batched path), track `lastSuccessfullyEmittedRev`/`lastSuccessfullyEmittedTime` and roll back to that on error (never to `startRev` after some messages may have been emitted).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

@JohnMcLear
Copy link
Copy Markdown
Member

I don't want to discourage you but this is not okay. Please read your PRs

@JohnMcLear JohnMcLear marked this pull request as draft June 2, 2026 09:38
@JohnMcLear JohnMcLear closed this Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants