Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const {
SafePromiseAll,
SafePromisePrototypeFinally,
SafeSet,
StringPrototypeStartsWith,
Symbol,
TypeError,
TypedArrayPrototypeGetBuffer,
TypedArrayPrototypeGetByteLength,
Expand Down Expand Up @@ -94,6 +96,9 @@ const { UV_EOF } = internalBinding('uv');

const encoder = new TextEncoder();

const kValidateChunk = Symbol('kValidateChunk');
const kDestroyOnSyncError = Symbol('kDestroyOnSyncError');

// Collect all negative (error) ZLIB codes and Z_NEED_DICT
const ZLIB_FAILURES = new SafeSet([
...ArrayPrototypeFilter(
Expand All @@ -115,7 +120,14 @@ function handleKnownInternalErrors(cause) {
case cause?.code === 'ERR_STREAM_PREMATURE_CLOSE': {
return new AbortError(undefined, { cause });
}
case ZLIB_FAILURES.has(cause?.code): {
case ZLIB_FAILURES.has(cause?.code):
// Brotli decoder error codes are formatted as 'ERR_' +
// BrotliDecoderErrorString(), where the latter returns strings like
// '_ERROR_FORMAT_...', '_ERROR_ALLOC_...', '_ERROR_UNREACHABLE', etc.
// The resulting JS error codes all start with 'ERR__ERROR_'.
// Falls through
case cause?.code != null &&
StringPrototypeStartsWith(cause.code, 'ERR__ERROR_'): {
// eslint-disable-next-line no-restricted-syntax
const error = new TypeError(undefined, { cause });
error.code = cause.code;
Expand All @@ -139,9 +151,10 @@ function handleKnownInternalErrors(cause) {

/**
* @param {Writable} streamWritable
* @param {object} [options]
* @returns {WritableStream}
*/
function newWritableStreamFromStreamWritable(streamWritable) {
function newWritableStreamFromStreamWritable(streamWritable, options = kEmptyObject) {
// Not using the internal/streams/utils isWritableNodeStream utility
// here because it will return false if streamWritable is a Duplex
// whose writable option is false. For a Duplex that is not writable,
Expand Down Expand Up @@ -220,12 +233,26 @@ function newWritableStreamFromStreamWritable(streamWritable) {
if (!streamWritable.writableObjectMode && isArrayBuffer(chunk)) {
chunk = new Uint8Array(chunk);
}
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = PromiseWithResolvers();
return SafePromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
try {
options[kValidateChunk]?.(chunk);
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = PromiseWithResolvers();
return SafePromisePrototypeFinally(
backpressurePromise.promise, () => {
backpressurePromise = undefined;
});
}
} catch (error) {
// When the kDestroyOnSyncError flag is set (e.g. for
// CompressionStream), a sync throw must also destroy the
// stream so the readable side is errored too. Without this
// the readable side hangs forever. This replicates the
// TransformStream semantics: error both sides on any throw
// in the transform path.
if (options[kDestroyOnSyncError]) {
destroy(streamWritable, error);
}
throw error;
}
},

Expand Down Expand Up @@ -662,9 +689,15 @@ function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
return { readable, writable };
}

const writableOptions = {
__proto__: null,
[kValidateChunk]: options[kValidateChunk],
[kDestroyOnSyncError]: options[kDestroyOnSyncError],
};

const writable =
isWritable(duplex) ?
newWritableStreamFromStreamWritable(duplex) :
newWritableStreamFromStreamWritable(duplex, writableOptions) :
new WritableStream();

if (!isWritable(duplex))
Expand Down Expand Up @@ -1064,4 +1097,6 @@ module.exports = {
newStreamDuplexFromReadableWritablePair,
newWritableStreamFromStreamBase,
newReadableStreamFromStreamBase,
kValidateChunk,
kDestroyOnSyncError,
};
49 changes: 38 additions & 11 deletions lib/internal/webstreams/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,28 @@ const {

const {
newReadableWritablePairFromDuplex,
kValidateChunk,
kDestroyOnSyncError,
} = require('internal/webstreams/adapters');

const { customInspect } = require('internal/webstreams/util');

const {
isArrayBufferView,
isSharedArrayBuffer,
} = require('internal/util/types');

const {
customInspectSymbol: kInspect,
kEnumerableProperty,
} = require('internal/util');

const {
codes: {
ERR_INVALID_ARG_TYPE,
},
} = require('internal/errors');

const { createEnumConverter } = require('internal/webidl');

let zlib;
Expand All @@ -24,6 +37,18 @@ function lazyZlib() {
return zlib;
}

// Per the Compression Streams spec, chunks must be BufferSource
// (ArrayBuffer or ArrayBufferView not backed by SharedArrayBuffer).
function validateBufferSourceChunk(chunk) {
if (isArrayBufferView(chunk) && isSharedArrayBuffer(chunk.buffer)) {
throw new ERR_INVALID_ARG_TYPE(
'chunk',
['ArrayBuffer', 'Buffer', 'TypedArray', 'DataView'],
chunk,
);
}
}

const formatConverter = createEnumConverter('CompressionFormat', [
'deflate',
'deflate-raw',
Expand Down Expand Up @@ -62,7 +87,10 @@ class CompressionStream {
this.#handle = lazyZlib().createBrotliCompress();
break;
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle);
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
[kValidateChunk]: validateBufferSourceChunk,
[kDestroyOnSyncError]: true,
});
}

/**
Expand Down Expand Up @@ -108,25 +136,24 @@ class DecompressionStream {
});
break;
case 'deflate-raw':
this.#handle = lazyZlib().createInflateRaw();
this.#handle = lazyZlib().createInflateRaw({
rejectGarbageAfterEnd: true,
});
break;
case 'gzip':
this.#handle = lazyZlib().createGunzip({
rejectGarbageAfterEnd: true,
});
break;
case 'brotli':
this.#handle = lazyZlib().createBrotliDecompress();
this.#handle = lazyZlib().createBrotliDecompress({
rejectGarbageAfterEnd: true,
});
break;
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle);

this.#handle.on('error', (err) => {
if (this.#transform?.writable &&
!this.#transform.writable.locked &&
typeof this.#transform.writable.abort === 'function') {
this.#transform.writable.abort(err);
}
this.#transform = newReadableWritablePairFromDuplex(this.#handle, {
[kValidateChunk]: validateBufferSourceChunk,
[kDestroyOnSyncError]: true,
});
}

Expand Down
8 changes: 8 additions & 0 deletions test/common/wpt.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class ResourceLoader {
return {
ok: true,
arrayBuffer() { return data.buffer; },
bytes() { return new Uint8Array(data); },
json() { return JSON.parse(data.toString()); },
text() { return data.toString(); },
};
Expand Down Expand Up @@ -721,6 +722,7 @@ class WPTRunner {
// Mark the whole test as failed in wpt.fyi report.
reportResult?.finish('ERROR');
this.inProgress.delete(spec);
this.report?.write();
});

await events.once(worker, 'exit').catch(() => {});
Expand Down Expand Up @@ -787,6 +789,9 @@ class WPTRunner {
}
}

// Write the report on clean exit. The report is also written
// incrementally after each spec completes (see completionCallback)
// so that results survive if the process is killed.
this.report?.write();

const ran = queue.length;
Expand Down Expand Up @@ -873,6 +878,9 @@ class WPTRunner {
reportResult?.finish();
}
this.inProgress.delete(spec);
// Write report incrementally so results survive even if the process
// is killed before the exit handler runs.
this.report?.write();
// Always force termination of the worker. Some tests allocate resources
// that would otherwise keep it alive.
this.workers.get(spec).terminate();
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ See [test/wpt](../../wpt/README.md) for information on how these tests are run.
Last update:

- common: https://github.com/web-platform-tests/wpt/tree/dbd648158d/common
- compression: https://github.com/web-platform-tests/wpt/tree/67880a4eb8/compression
- compression: https://github.com/web-platform-tests/wpt/tree/ae05f5cb53/compression
- console: https://github.com/web-platform-tests/wpt/tree/e48251b778/console
- dom/abort: https://github.com/web-platform-tests/wpt/tree/dc928169ee/dom/abort
- dom/events: https://github.com/web-platform-tests/wpt/tree/0a811c5161/dom/events
Expand Down
57 changes: 57 additions & 0 deletions test/fixtures/wpt/compression/compression-bad-chunks.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// META: global=window,worker,shadowrealm
// META: script=resources/formats.js

'use strict';

const badChunks = [
{
name: 'undefined',
value: undefined
},
{
name: 'null',
value: null
},
{
name: 'numeric',
value: 3.14
},
{
name: 'object, not BufferSource',
value: {}
},
{
name: 'array',
value: [65]
},
{
name: 'SharedArrayBuffer',
// Use a getter to postpone construction so that all tests don't fail where
// SharedArrayBuffer is not yet implemented.
get value() {
// See https://github.com/whatwg/html/issues/5380 for why not `new SharedArrayBuffer()`
return new WebAssembly.Memory({ shared:true, initial:1, maximum:1 }).buffer;
}
},
{
name: 'shared Uint8Array',
get value() {
// See https://github.com/whatwg/html/issues/5380 for why not `new SharedArrayBuffer()`
return new Uint8Array(new WebAssembly.Memory({ shared:true, initial:1, maximum:1 }).buffer)
}
},
];

for (const format of formats) {
for (const chunk of badChunks) {
promise_test(async t => {
const cs = new CompressionStream(format);
const reader = cs.readable.getReader();
const writer = cs.writable.getWriter();
const writePromise = writer.write(chunk.value);
const readPromise = reader.read();
await promise_rejects_js(t, TypeError, writePromise, 'write should reject');
await promise_rejects_js(t, TypeError, readPromise, 'read should reject');
}, `chunk of type ${chunk.name} should error the stream for ${format}`);
}
}

This file was deleted.

Loading
Loading