From 3d6eb40c04606b28986fc57dcff48c42bffde3bf Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 26 May 2026 21:46:05 -0700 Subject: [PATCH 1/4] stream: fast path single-destination pipe Avoid EventEmitter dispatch for the readable pipe data handler when it is the only data listener. Fall back to normal data event emission when additional data listeners are present to preserve listener ordering. Track the pipe data handler lazily so non-piped readable streams do not pay an extra ReadableState property initialization or data listener identity check. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/readable.js | 21 ++++++++++--- test/parallel/test-stream2-basic.js | 48 +++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 57aa71817e4fba..b0d69dddd5bef6 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -111,6 +111,7 @@ const kErroredValue = Symbol('kErroredValue'); const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kDecoderValue = Symbol('kDecoderValue'); const kEncodingValue = Symbol('kEncodingValue'); +const kPipeData = Symbol('kPipeData'); const kEnded = 1 << 9; const kEndEmitted = 1 << 10; @@ -289,6 +290,7 @@ function ReadableState(options, stream, isDuplex) { this.bufferIndex = 0; this.length = 0; this.pipes = []; + this[kPipeData] = null; // Should close be emitted on destroy. Defaults to true. if (options && options.emitClose === false) this[kState] &= ~kEmitClose; @@ -564,8 +566,7 @@ function addChunk(stream, state, chunk, addToFront) { state.awaitDrainWriters = null; } - state[kState] |= kDataEmitted; - stream.emit('data', chunk); + emitData(stream, state, chunk); } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; @@ -790,13 +791,21 @@ Readable.prototype.read = function(n) { } if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) { - state[kState] |= kDataEmitted; - this.emit('data', ret); + emitData(this, state, ret); } return ret; }; +function emitData(stream, state, chunk) { + state[kState] |= kDataEmitted; + if (stream._events.data === state[kPipeData]) { + state[kPipeData](chunk); + } else { + stream.emit('data', chunk); + } +} + function onEofChunk(stream, state) { debug('onEofChunk'); if ((state[kState] & kEnded) !== 0) return; @@ -976,6 +985,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.removeListener('end', onend); src.removeListener('end', unpipe); src.removeListener('data', ondata); + if (state[kPipeData] === ondata) + state[kPipeData] = null; cleanedUp = true; @@ -1016,6 +1027,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } src.on('data', ondata); + if (src._events.data === ondata) + state[kPipeData] = ondata; function ondata(chunk) { debug('ondata'); try { diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index f51009241da56e..d71ee0a0a7f4cf 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -146,6 +146,54 @@ class TestWriter extends EE { r.pipe(w); } +{ + // Verify data listener ordering around pipe. + function makeWritable(events) { + return new W({ + write(chunk, enc, cb) { + events.push('write'); + cb(); + }, + }); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.on('data', () => events.push('data')); + r.pipe(w); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['data', 'write']); + })); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.pipe(w); + r.on('data', () => events.push('data')); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['write', 'data']); + })); + } + + { + const events = []; + const r = R.from(['x']); + const w = makeWritable(events); + + r.pipe(w); + r.prependListener('data', () => events.push('data')); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['data', 'write']); + })); + } +} + [1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { // Verify unpipe From 9f0dfce0a87474edb7a92c8756ff0a686e050699 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 26 May 2026 22:41:09 -0700 Subject: [PATCH 2/4] test: cover data listener added after pipe --- test/parallel/test-stream2-basic.js | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js index d71ee0a0a7f4cf..a1dc9058b00e72 100644 --- a/test/parallel/test-stream2-basic.js +++ b/test/parallel/test-stream2-basic.js @@ -192,6 +192,27 @@ class TestWriter extends EE { assert.deepStrictEqual(events, ['data', 'write']); })); } + + { + const events = []; + const r = R.from(['a', 'b']); + const w = new W({ + write: common.mustCall((chunk, enc, cb) => { + events.push(`write:${chunk}`); + if (String(chunk) === 'a') { + r.on('data', common.mustCall((chunk) => { + events.push(`data:${chunk}`); + })); + } + cb(); + }, 2), + }); + + r.pipe(w); + w.on('finish', common.mustCall(() => { + assert.deepStrictEqual(events, ['write:a', 'write:b', 'data:b']); + })); + } } From 654d19685609530f7542c20cd1a3d404e15d573d Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Wed, 27 May 2026 18:33:34 -0700 Subject: [PATCH 3/4] streams: make pipe data fast path state lazy --- lib/internal/streams/readable.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index b0d69dddd5bef6..c9e909627128a7 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -132,6 +132,7 @@ const kFlowing = 1 << 24; const kHasPaused = 1 << 25; const kPaused = 1 << 26; const kDataListening = 1 << 27; +const kHasPipeData = 1 << 28; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -290,7 +291,6 @@ function ReadableState(options, stream, isDuplex) { this.bufferIndex = 0; this.length = 0; this.pipes = []; - this[kPipeData] = null; // Should close be emitted on destroy. Defaults to true. if (options && options.emitClose === false) this[kState] &= ~kEmitClose; @@ -799,7 +799,8 @@ Readable.prototype.read = function(n) { function emitData(stream, state, chunk) { state[kState] |= kDataEmitted; - if (stream._events.data === state[kPipeData]) { + if ((state[kState] & kHasPipeData) !== 0 && + stream._events.data === state[kPipeData]) { state[kPipeData](chunk); } else { stream.emit('data', chunk); @@ -986,7 +987,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.removeListener('end', unpipe); src.removeListener('data', ondata); if (state[kPipeData] === ondata) - state[kPipeData] = null; + state[kState] &= ~kHasPipeData; cleanedUp = true; @@ -1027,8 +1028,10 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } src.on('data', ondata); - if (src._events.data === ondata) + if (src._events.data === ondata) { state[kPipeData] = ondata; + state[kState] |= kHasPipeData; + } function ondata(chunk) { debug('ondata'); try { From 4708ae2a5eb730a2392e33723ecca07f86e1e51a Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Wed, 27 May 2026 20:00:13 -0700 Subject: [PATCH 4/4] streams: avoid helper for non-pipe data emission --- lib/internal/streams/readable.js | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index c9e909627128a7..9c2a15ecabe216 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -566,7 +566,13 @@ function addChunk(stream, state, chunk, addToFront) { state.awaitDrainWriters = null; } - emitData(stream, state, chunk); + state[kState] |= kDataEmitted; + if ((state[kState] & kHasPipeData) !== 0 && + stream._events.data === state[kPipeData]) { + state[kPipeData](chunk); + } else { + stream.emit('data', chunk); + } } else { // Update the buffer info. state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; @@ -791,22 +797,18 @@ Readable.prototype.read = function(n) { } if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) { - emitData(this, state, ret); + state[kState] |= kDataEmitted; + if ((state[kState] & kHasPipeData) !== 0 && + this._events.data === state[kPipeData]) { + state[kPipeData](ret); + } else { + this.emit('data', ret); + } } return ret; }; -function emitData(stream, state, chunk) { - state[kState] |= kDataEmitted; - if ((state[kState] & kHasPipeData) !== 0 && - stream._events.data === state[kPipeData]) { - state[kPipeData](chunk); - } else { - stream.emit('data', chunk); - } -} - function onEofChunk(stream, state) { debug('onEofChunk'); if ((state[kState] & kEnded) !== 0) return;