Skip to content

Commit e41bcda

Browse files
committed
Restore original PumpToReader and place pumpToImpl behind autogate
1 parent a0e8735 commit e41bcda

1 file changed

Lines changed: 207 additions & 7 deletions

File tree

src/workerd/api/streams/standard.c++

Lines changed: 207 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <workerd/io/features.h>
1111
#include <workerd/jsg/jsg.h>
12+
#include <workerd/util/autogate.h>
1213
#include <workerd/util/state-machine.h>
1314
#include <workerd/util/weak-refs.h>
1415

@@ -3322,6 +3323,196 @@ class AllReader {
33223323
}
33233324
};
33243325

3326+
// PumpToReader implements the original JS promise-loop approach to pumping data from
3327+
// a ReadableStream to a WritableStreamSink. It reads one chunk at a time using the
3328+
// standard read() API, writes each chunk to the sink, and loops until done or errored.
3329+
// This is the fallback path used when the ENABLE_DRAINING_READ_ON_STANDARD_STREAMS
3330+
// autogate is not enabled.
3331+
class PumpToReader {
3332+
public:
3333+
PumpToReader(jsg::Ref<ReadableStream> stream, kj::Own<WritableStreamSink> sink, bool end)
3334+
: ioContext(IoContext::current()),
3335+
state(State::create<jsg::Ref<ReadableStream>>(kj::mv(stream))),
3336+
sink(kj::mv(sink)),
3337+
self(kj::refcounted<WeakRef<PumpToReader>>(kj::Badge<PumpToReader>{}, *this)),
3338+
end(end) {}
3339+
KJ_DISALLOW_COPY_AND_MOVE(PumpToReader);
3340+
3341+
~PumpToReader() noexcept(false) {
3342+
self->invalidate();
3343+
// Ensure that if a write promise is pending it is proactively canceled.
3344+
canceler.cancel("PumpToReader was destroyed");
3345+
}
3346+
3347+
kj::Promise<void> pumpTo(jsg::Lock& js) {
3348+
ioContext.requireCurrentOrThrowJs();
3349+
KJ_SWITCH_ONEOF(state) {
3350+
KJ_CASE_ONEOF(stream, jsg::Ref<ReadableStream>) {
3351+
auto readable = stream.addRef();
3352+
state.template transitionTo<Pumping>();
3353+
return ioContext.awaitJs(
3354+
js, pumpLoop(js, ioContext, kj::mv(readable), ioContext.addObject(self->addRef())));
3355+
}
3356+
KJ_CASE_ONEOF(pumping, Pumping) {
3357+
return KJ_EXCEPTION(FAILED, "pumping is already in progress");
3358+
}
3359+
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
3360+
return KJ_EXCEPTION(FAILED, "stream has already been consumed");
3361+
}
3362+
KJ_CASE_ONEOF(errored, kj::Exception) {
3363+
return kj::cp(errored);
3364+
}
3365+
}
3366+
KJ_UNREACHABLE;
3367+
}
3368+
3369+
private:
3370+
struct Pumping {
3371+
static constexpr kj::StringPtr NAME KJ_UNUSED = "pumping"_kj;
3372+
};
3373+
IoContext& ioContext;
3374+
3375+
using State = StateMachine<TerminalStates<StreamStates::Closed>,
3376+
ErrorState<kj::Exception>,
3377+
Pumping,
3378+
StreamStates::Closed,
3379+
kj::Exception,
3380+
jsg::Ref<ReadableStream>>;
3381+
State state;
3382+
kj::Own<WritableStreamSink> sink;
3383+
kj::Own<WeakRef<PumpToReader>> self;
3384+
kj::Canceler canceler;
3385+
bool end;
3386+
3387+
bool isErroredOrClosed() {
3388+
return state.isTerminal();
3389+
}
3390+
3391+
jsg::Promise<void> pumpLoop(jsg::Lock& js,
3392+
IoContext& ioContext,
3393+
jsg::Ref<ReadableStream> readable,
3394+
IoOwn<WeakRef<PumpToReader>> pumpToReader) {
3395+
ioContext.requireCurrentOrThrowJs();
3396+
3397+
KJ_SWITCH_ONEOF(state) {
3398+
KJ_CASE_ONEOF(ready, jsg::Ref<ReadableStream>) {
3399+
KJ_UNREACHABLE;
3400+
}
3401+
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
3402+
return end ? ioContext.awaitIoLegacy(js, sink->end().attach(kj::mv(sink)))
3403+
: js.resolvedPromise();
3404+
}
3405+
KJ_CASE_ONEOF(errored, kj::Exception) {
3406+
if (end) {
3407+
sink->abort(kj::cp(errored));
3408+
}
3409+
return js.rejectedPromise<void>(kj::cp(errored));
3410+
}
3411+
KJ_CASE_ONEOF(pumping, Pumping) {
3412+
using Result = kj::OneOf<Pumping, kj::Array<kj::byte>, StreamStates::Closed, jsg::Value>;
3413+
3414+
return KJ_ASSERT_NONNULL(readable->getController().read(js, kj::none))
3415+
.then(js,
3416+
ioContext.addFunctor([byteStream = readable->getController().isByteOriented()](
3417+
auto& js, ReadResult result) mutable -> Result {
3418+
if (result.done) {
3419+
return StreamStates::Closed();
3420+
}
3421+
3422+
auto handle = KJ_ASSERT_NONNULL(result.value).getHandle(js);
3423+
if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) {
3424+
return js.v8Ref(js.v8TypeError("This ReadableStream did not return bytes."));
3425+
}
3426+
3427+
jsg::BufferSource bufferSource(js, handle);
3428+
if (bufferSource.size() == 0) {
3429+
return Pumping{};
3430+
}
3431+
3432+
if (byteStream) {
3433+
jsg::BackingStore backing = bufferSource.detach(js);
3434+
return backing.asArrayPtr().attach(kj::mv(backing));
3435+
}
3436+
return bufferSource.asArrayPtr().attach(kj::mv(bufferSource));
3437+
}),
3438+
[](auto& js, jsg::Value exception) mutable -> Result { return kj::mv(exception); })
3439+
.then(js, ioContext.addFunctor( JSG_VISITABLE_LAMBDA((readable = kj::mv(readable), pumpToReader = kj::mv(pumpToReader)), (readable), (jsg::Lock & js, Result result) mutable {
3440+
KJ_IF_SOME(reader, pumpToReader->tryGet()) {
3441+
reader.ioContext.requireCurrentOrThrowJs();
3442+
auto& ioContext = IoContext::current();
3443+
KJ_SWITCH_ONEOF(result) {
3444+
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
3445+
auto promise = reader.sink->write(bytes).attach(kj::mv(bytes));
3446+
return ioContext.awaitIo(js, reader.canceler.wrap(kj::mv(promise)))
3447+
.then(js,
3448+
[](jsg::Lock& js) -> kj::Maybe<jsg::Value> {
3449+
return kj::Maybe<jsg::Value>(kj::none);
3450+
},
3451+
[](jsg::Lock& js, jsg::Value exception) mutable -> kj::Maybe<jsg::Value> {
3452+
return kj::mv(exception);
3453+
})
3454+
.then(js,
3455+
ioContext.addFunctor(JSG_VISITABLE_LAMBDA(
3456+
(readable = readable.addRef(), pumpToReader = kj::mv(pumpToReader)),
3457+
(readable),
3458+
(jsg::Lock & js, kj::Maybe<jsg::Value> maybeException) mutable {
3459+
KJ_IF_SOME(reader, pumpToReader->tryGet()) {
3460+
auto& ioContext = reader.ioContext;
3461+
ioContext.requireCurrentOrThrowJs();
3462+
KJ_IF_SOME(exception, maybeException) {
3463+
if (!reader.isErroredOrClosed()) {
3464+
reader.state.transitionTo<kj::Exception>(
3465+
js.exceptionToKj(kj::mv(exception)));
3466+
}
3467+
} else {
3468+
// Else block to avert dangling else compiler warning.
3469+
}
3470+
return reader.pumpLoop(
3471+
js, ioContext, readable.addRef(), kj::mv(pumpToReader));
3472+
} else {
3473+
return readable->getController().cancel(js,
3474+
maybeException.map(
3475+
[&](jsg::Value& ex) { return ex.getHandle(js); }));
3476+
}
3477+
})));
3478+
}
3479+
KJ_CASE_ONEOF(pumping, Pumping) {}
3480+
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
3481+
if (!reader.isErroredOrClosed()) {
3482+
reader.state.transitionTo<StreamStates::Closed>();
3483+
}
3484+
}
3485+
KJ_CASE_ONEOF(exception, jsg::Value) {
3486+
if (!reader.isErroredOrClosed()) {
3487+
reader.state.transitionTo<kj::Exception>(js.exceptionToKj(kj::mv(exception)));
3488+
}
3489+
}
3490+
}
3491+
return reader.pumpLoop(js, ioContext, readable.addRef(), kj::mv(pumpToReader));
3492+
} else {
3493+
KJ_SWITCH_ONEOF(result) {
3494+
KJ_CASE_ONEOF(bytes, kj::Array<kj::byte>) {
3495+
return readable->getController().cancel(js, kj::none);
3496+
}
3497+
KJ_CASE_ONEOF(pumping, Pumping) {
3498+
return readable->getController().cancel(js, kj::none);
3499+
}
3500+
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
3501+
return js.resolvedPromise();
3502+
}
3503+
KJ_CASE_ONEOF(exception, jsg::Value) {
3504+
return readable->getController().cancel(js, exception.getHandle(js));
3505+
}
3506+
}
3507+
}
3508+
KJ_UNREACHABLE;
3509+
})));
3510+
}
3511+
}
3512+
KJ_UNREACHABLE;
3513+
}
3514+
};
3515+
33253516
// pumpToCoroutine uses a DrainingReader to efficiently pull all synchronously available
33263517
// data from the stream in each iteration, then writes it to the sink using vectored
33273518
// I/O. This minimizes isolate lock acquisitions by batching: each time the lock is
@@ -3527,15 +3718,24 @@ kj::Promise<DeferredProxy<void>> ReadableStreamJsController::pumpTo(
35273718
disturbed = true;
35283719

35293720
// This operation will leave the ReadableStream locked and disturbed. It will consume
3530-
// the stream until it either closed or errors. If the returned promise (or its inner
3531-
// promise) is dropped, the coroutine frame is destroyed, which drops the DrainingReader
3532-
// (releasing the stream lock) and the sink, canceling any in-flight operations.
3721+
// the stream until it either closed or errors.
3722+
//
3723+
// When the ENABLE_DRAINING_READ_ON_STANDARD_STREAMS autogate is enabled, uses the new
3724+
// pumpToImpl coroutine with DrainingReader for batched reads and vectored writes.
3725+
// Otherwise, falls back to the original PumpToReader JS promise loop that reads one
3726+
// chunk at a time.
35333727

35343728
const auto handlePump = [&] {
3535-
auto reader = KJ_ASSERT_NONNULL(DrainingReader::create(js, *this->addRef()),
3536-
"Failed to create DrainingReader — stream should not be locked");
3537-
auto& ioContext = IoContext::current();
3538-
return addNoopDeferredProxy(pumpToImpl(ioContext, kj::mv(reader), kj::mv(sink), end));
3729+
if (util::Autogate::isEnabled(util::AutogateKey::ENABLE_DRAINING_READ_ON_STANDARD_STREAMS)) {
3730+
auto reader = KJ_ASSERT_NONNULL(DrainingReader::create(js, *this->addRef()),
3731+
"Failed to create DrainingReader — stream should not be locked");
3732+
auto& ioContext = IoContext::current();
3733+
return addNoopDeferredProxy(pumpToImpl(ioContext, kj::mv(reader), kj::mv(sink), end));
3734+
} else {
3735+
KJ_ASSERT(lock.lock());
3736+
auto reader = kj::heap<PumpToReader>(addRef(), kj::mv(sink), end);
3737+
return addNoopDeferredProxy(reader->pumpTo(js).attach(kj::mv(reader)));
3738+
}
35393739
};
35403740

35413741
KJ_SWITCH_ONEOF(state) {

0 commit comments

Comments
 (0)