Skip to content

Commit ae779b6

Browse files
committed
stream/iter: keep overlapping broadcast reads pending
Broadcast consumers may receive overlapping next() calls on the same iterator. Queue those reads so chunks satisfy them in call order. A single written chunk should resolve the earliest pending next(); later next() calls remain pending until more data is written or the broadcast completes. Fixes: #63499 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent 8d3245e commit ae779b6

2 files changed

Lines changed: 90 additions & 3 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
const {
1010
ArrayIsArray,
1111
ArrayPrototypePush,
12+
ArrayPrototypeShift,
1213
MathMax,
1314
PromisePrototypeThen,
1415
PromiseReject,
@@ -146,6 +147,7 @@ class BroadcastImpl {
146147
cursor: this.#bufferStart,
147148
resolve: null,
148149
reject: null,
150+
pending: [],
149151
detached: false,
150152
};
151153

@@ -165,8 +167,10 @@ class BroadcastImpl {
165167

166168
function detach() {
167169
state.detached = true;
168-
state.resolve = null;
169-
state.reject = null;
170+
if (state.resolve) {
171+
state.resolve({ __proto__: null, done: true, value: undefined });
172+
}
173+
self.#resolvePendingDone(state);
170174
if (self.#deleteConsumer(state)) {
171175
self.#tryTrimBuffer();
172176
}
@@ -207,6 +211,13 @@ class BroadcastImpl {
207211
return kDone;
208212
}
209213

214+
if (state.resolve) {
215+
const { promise, resolve, reject } = PromiseWithResolvers();
216+
ArrayPrototypePush(state.pending,
217+
{ __proto__: null, resolve, reject });
218+
return promise;
219+
}
220+
210221
const { promise, resolve, reject } = PromiseWithResolvers();
211222
state.resolve = resolve;
212223
state.reject = reject;
@@ -250,6 +261,11 @@ class BroadcastImpl {
250261
consumer.resolve = null;
251262
consumer.reject = null;
252263
}
264+
if (reason !== undefined) {
265+
this.#rejectPending(consumer, reason);
266+
} else {
267+
this.#resolvePendingDone(consumer);
268+
}
253269
consumer.detached = true;
254270
}
255271
this.#consumers.clear();
@@ -296,7 +312,7 @@ class BroadcastImpl {
296312
this.#ended = true;
297313

298314
for (const consumer of this.#consumers) {
299-
if (consumer.resolve) {
315+
while (consumer.resolve) {
300316
const bufferIndex = consumer.cursor - this.#bufferStart;
301317
if (bufferIndex < this.#buffer.length) {
302318
const chunk = this.#buffer.get(bufferIndex);
@@ -309,9 +325,15 @@ class BroadcastImpl {
309325
consumer.resolve({ __proto__: null, done: false, value: chunk });
310326
} else {
311327
consumer.resolve({ __proto__: null, done: true, value: undefined });
328+
this.#resolvePendingDone(consumer);
329+
consumer.detached = true;
312330
}
313331
consumer.resolve = null;
314332
consumer.reject = null;
333+
if (consumer.detached && this.#deleteConsumer(consumer)) {
334+
this.#tryTrimBuffer();
335+
break;
336+
}
315337
}
316338
}
317339
}
@@ -328,6 +350,7 @@ class BroadcastImpl {
328350
consumer.resolve = null;
329351
consumer.reject = null;
330352
}
353+
this.#rejectPending(consumer, reason);
331354
consumer.detached = true;
332355
}
333356
this.#consumers.clear();
@@ -396,6 +419,11 @@ class BroadcastImpl {
396419
consumer.resolve = null;
397420
consumer.reject = null;
398421
resolve({ __proto__: null, done: false, value: chunk });
422+
if (consumer.detached && this.#deleteConsumer(consumer)) {
423+
this.#tryTrimBuffer();
424+
} else if (this.#promotePending(consumer)) {
425+
ArrayPrototypePush(this.#waiters, consumer);
426+
}
399427
} else {
400428
// Still waiting -- put back
401429
ArrayPrototypePush(this.#waiters, consumer);
@@ -418,6 +446,31 @@ class BroadcastImpl {
418446
}
419447
return false;
420448
}
449+
450+
#promotePending(consumer) {
451+
const next = ArrayPrototypeShift(consumer.pending);
452+
if (next === undefined) return false;
453+
consumer.resolve = next.resolve;
454+
consumer.reject = next.reject;
455+
return true;
456+
}
457+
458+
#resolvePendingDone(consumer) {
459+
if (consumer.resolve) {
460+
consumer.resolve = null;
461+
consumer.reject = null;
462+
}
463+
while (consumer.pending.length > 0) {
464+
ArrayPrototypeShift(consumer.pending).resolve(
465+
{ __proto__: null, done: true, value: undefined });
466+
}
467+
}
468+
469+
#rejectPending(consumer, reason) {
470+
while (consumer.pending.length > 0) {
471+
ArrayPrototypeShift(consumer.pending).reject(reason);
472+
}
473+
}
421474
}
422475

423476
// =============================================================================

test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
const common = require('../common');
55
const assert = require('assert');
6+
const { setTimeout } = require('timers/promises');
67
const { broadcast, text } = require('stream/iter');
78

89
// =============================================================================
@@ -243,6 +244,38 @@ async function testLateJoinerSeesBufferedData() {
243244
assert.strictEqual(result, 'before-join');
244245
}
245246

247+
async function testOverlappingNextKeepsEarlierRead() {
248+
const { writer, broadcast: bc } = broadcast();
249+
const it = bc.push()[Symbol.asyncIterator]();
250+
251+
const first = it.next();
252+
const second = it.next();
253+
254+
await writer.write('x');
255+
256+
const secondResult = await Promise.race([
257+
second.then((value) => ({ __proto__: null, settled: true, value })),
258+
setTimeout(common.platformTimeout(50),
259+
{ __proto__: null, settled: false }),
260+
]);
261+
assert.deepStrictEqual(secondResult, {
262+
__proto__: null,
263+
settled: false,
264+
});
265+
266+
const result = await first;
267+
assert.strictEqual(result.done, false);
268+
assert.strictEqual(Buffer.concat(result.value).toString(), 'x');
269+
270+
writer.endSync();
271+
assert.deepStrictEqual(await second, {
272+
__proto__: null,
273+
done: true,
274+
value: undefined,
275+
});
276+
assert.strictEqual(bc.consumerCount, 0);
277+
}
278+
246279
Promise.all([
247280
testBasicBroadcast(),
248281
testMultipleWrites(),
@@ -257,4 +290,5 @@ Promise.all([
257290
testFailDetachesConsumers(),
258291
testWriterFailIdempotent(),
259292
testLateJoinerSeesBufferedData(),
293+
testOverlappingNextKeepsEarlierRead(),
260294
]).then(common.mustCall());

0 commit comments

Comments
 (0)