From a581189747b7d9debd03e41583550753c4357d13 Mon Sep 17 00:00:00 2001 From: marcopiraccini Date: Thu, 5 Mar 2026 14:06:34 +0100 Subject: [PATCH 1/3] stream: preserve error over AbortError in pipeline Signed-off-by: marcopiraccini --- lib/internal/streams/pipeline.js | 2 +- test/parallel/test-stream-pipeline.js | 32 +++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ad3f0796875aae..546ff579d8ebbd 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -227,7 +227,7 @@ function pipelineImpl(streams, callback, opts) { } function finishImpl(err, final) { - if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { + if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE' || error.name === 'AbortError')) { error = err; } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 6220bc15365361..6c94428d1941cf 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1749,3 +1749,35 @@ tmpdir.refresh(); assert.deepStrictEqual(err, new Error('booom')); })); } + +{ + // Errors thrown in Readable.map inside pipeline should not be + // swallowed by AbortError when the source is an infinite stream. + function createInfiniteReadable() { + return new Readable({ + read() { + this.push('data'); + }, + }); + } + + function createObjectTransform() { + return new Transform({ + readableObjectMode: true, + transform(chunk, encoding, callback) { + this.push({}); + callback(); + }, + }); + } + + pipelinep( + createInfiniteReadable(), + createObjectTransform(), + (readable) => readable.map(async () => { + throw new Error('Boom!'); + }), + ).then(common.mustNotCall(), common.mustCall((err) => { + assert.strictEqual(err.message, 'Boom!'); + })); +} From 0dc1ebcbff38c4aec644d70589ad15b45c98f86b Mon Sep 17 00:00:00 2001 From: marcopiraccini Date: Thu, 5 Mar 2026 14:13:18 +0100 Subject: [PATCH 2/3] stream: fix lint error in pipeline test Signed-off-by: marcopiraccini --- test/parallel/test-stream-pipeline.js | 43 +++++++++++---------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 6c94428d1941cf..ee2bafb4c2aee8 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1753,31 +1753,24 @@ tmpdir.refresh(); { // Errors thrown in Readable.map inside pipeline should not be // swallowed by AbortError when the source is an infinite stream. - function createInfiniteReadable() { - return new Readable({ - read() { - this.push('data'); - }, - }); - } - - function createObjectTransform() { - return new Transform({ - readableObjectMode: true, - transform(chunk, encoding, callback) { - this.push({}); - callback(); - }, - }); + async function run() { + await assert.rejects( + pipelinep( + new Readable({ read() { this.push('data'); } }), + new Transform({ + readableObjectMode: true, + transform(chunk, encoding, callback) { + this.push({}); + callback(); + }, + }), + (readable) => readable.map(async () => { + throw new Error('Boom!'); + }), + ), + { message: 'Boom!' }, + ); } - pipelinep( - createInfiniteReadable(), - createObjectTransform(), - (readable) => readable.map(async () => { - throw new Error('Boom!'); - }), - ).then(common.mustNotCall(), common.mustCall((err) => { - assert.strictEqual(err.message, 'Boom!'); - })); + run().then(common.mustCall()); } From 543ce4e1c2726e55dbb1741525bea004200f29d5 Mon Sep 17 00:00:00 2001 From: marcopiraccini Date: Thu, 5 Mar 2026 18:03:38 +0100 Subject: [PATCH 3/3] test fixup Signed-off-by: marcopiraccini --- test/parallel/test-stream-pipeline.js | 36 ++++++++++++--------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ee2bafb4c2aee8..8852f24a75050f 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1753,24 +1753,20 @@ tmpdir.refresh(); { // Errors thrown in Readable.map inside pipeline should not be // swallowed by AbortError when the source is an infinite stream. - async function run() { - await assert.rejects( - pipelinep( - new Readable({ read() { this.push('data'); } }), - new Transform({ - readableObjectMode: true, - transform(chunk, encoding, callback) { - this.push({}); - callback(); - }, - }), - (readable) => readable.map(async () => { - throw new Error('Boom!'); - }), - ), - { message: 'Boom!' }, - ); - } - - run().then(common.mustCall()); + pipeline( + new Readable({ read() { this.push('data'); } }), + new Transform({ + readableObjectMode: true, + transform(chunk, encoding, callback) { + this.push({}); + callback(); + }, + }), + (readable) => readable.map(async () => { + throw new Error('Boom!'); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'Boom!'); + }), + ); }