Skip to content

Add Socket::proxyTo() API#6299

Draft
fhanau wants to merge 1 commit intofelix/experimental-tcp-ingressfrom
felix/030226-proxyTo
Draft

Add Socket::proxyTo() API#6299
fhanau wants to merge 1 commit intofelix/experimental-tcp-ingressfrom
felix/030226-proxyTo

Conversation

@fhanau
Copy link
Contributor

@fhanau fhanau commented Mar 11, 2026

This makes proxying sockets more convenient, should be especially useful for the connect handler.

Follow-up to #6059.

This makes proxying sockets more convenient, should be especially useful for the
connect handler.

// Proxies to the other socket. Equivalent to:
// a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable);
// TODO: May want to add jsg::Optional<PipeToOptions> options?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about this – one issue with taking options is that we can't apply them to both streams since the AbortSignal field can't be copied/shared between the two streams(?), having two options parameters instead seems highly inelegant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the single AbortSignal be used to abort both directions? I think that's what people would want.

That said, I don't think we necessarily need to support the options. People can always call pipeTo() manually if they want options.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should absolutely be just a single AbortSignal. Users should not have to pass two and there really should not be a reason to.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I agree, there is no reason to pass PipeToOptions. AbortSignal is really the only option in this case that matters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we accept an AbortSignal it should still be part of an options struct.

If we're accepting an options struct then I think it might as well be PipeToOptions, and we might as well pass all the options into both pipeTo()s.

Copy link
Contributor

@ask-bonk ask-bonk bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm Bonk, and I've done a quick review of your PR.

Adds a Socket::proxyTo() convenience method that chains two pipeTo calls to proxy bidirectional traffic between sockets.

Issues found (ranked by severity):

  1. [HIGH] proxyTo() runs pipes sequentially, not concurrently — breaks bidirectional proxying.
  2. [MEDIUM] The first pipeTo (with default options) closes this->writable on completion, which may tear down the underlying TCP connection before the second pipe starts.

This review was generated by an AI assistant and may contain inaccuracies.

Comment on lines +321 to +325
jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) {
return sock->readable->pipeTo(js, writable.addRef(), {})
.then(js, [this, sock = kj::mv(sock)](jsg::Lock& js) mutable {
return readable->pipeTo(js, sock->writable.addRef(), {});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two pipes are chained sequentially: the second pipeTo (this→sock) doesn't start until the first (sock→this) fully completes. For a bidirectional TCP proxy, both directions need to flow concurrently — the original test code used Promise.all for exactly this reason.

Additionally, the first pipeTo uses default PipeToOptions, so when sock->readable ends, it will close this->writable. Depending on socket semantics, this could tear down the underlying connection before the second pipe runs.

The fix should run both pipes concurrently (like the original Promise.all pattern) and use preventClose/preventAbort/preventCancel as appropriate:

Suggested change
jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) {
return sock->readable->pipeTo(js, writable.addRef(), {})
.then(js, [this, sock = kj::mv(sock)](jsg::Lock& js) mutable {
return readable->pipeTo(js, sock->writable.addRef(), {});
});
jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) {
auto pipeIn = sock->readable->pipeTo(js, writable.addRef(), {});
auto pipeOut = readable->pipeTo(js, sock->writable.addRef(), {});
return pipeIn.then(js, [pipeOut = kj::mv(pipeOut)](jsg::Lock& js) mutable {
return kj::mv(pipeOut);
});
}

This is the same Promise.all-equivalent pattern used in Socket::close() (lines 306-309). You may also want to consider whether preventClose should be set on either or both pipes depending on the desired socket lifecycle semantics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bonk is mostly right, but I'd suggest not using .then() at all and instead join the promises.

We could either use kj::joinPromises(), or kj::joinPromisesFailFast(). It's actually not entirely clear which is better here. FailFast means that if either direction throws an exception, we'd immediately cancel the opposite pump and the proxyTo() would fail out. With the non-failfast version, upon one direction failing, we'd still give the other direction a chance to finish.

I think failfast seems probably better but I'm a bit torn.

Copy link
Collaborator

@jasnell jasnell Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But just want to clarify, jsg::Promise (which is what is returned by pipeTo) does not support kj::joinPromises .. with @kentonv's suggestion here, we would essentially need to drop down to piping the underlying kj streams and not go through the web streams wrappers... which I think we likely want to do regardless so that the proxy pipeline runs much more efficiently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point.

So what you really want here is actually Promise.all but unfortunately V8 doesn't seem to give us a direct way to call that.

I don't think the code here needs to dig down to the KJ level. The two pipeTo() calls should automatically figure out, under the hood, whether they can delegate to KJ pumpTo() on underlying streams.

That said if we don't go directly to KJ, then we can't cancel one direction when the other direction fails. Maybe that's fine, I'm not sure we really needed to.

Still need to decide if we want to propagate errors immediately or only after both directions settle. Probably immediately, though it's a little weird that this means the other direction could still be pumping.

Note than Bonk's suggested code would eagerly propagate an error from pipeIn but not from pipeOut. That seems definitely wrong -- the two pumps should be symmetric.


// Proxies to the other socket. Equivalent to:
// a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable);
// TODO: May want to add jsg::Optional<PipeToOptions> options?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The doc comment says this is equivalent to a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); (which reads as concurrent), but the implementation chains them sequentially. If the implementation is intentionally sequential, the comment should say so; if concurrent, the implementation needs fixing (see comment on the .c++ file).

@ask-bonk
Copy link
Contributor

ask-bonk bot commented Mar 11, 2026

I'm Bonk, and I've done a quick review of your PR.

This PR adds a Socket::proxyTo() convenience method. I found one high-severity issue and posted a review with 2 inline comments on PR #6299:

  1. [HIGH] The two pipeTo calls are chained sequentially (the second direction doesn't start until the first completes), which breaks bidirectional TCP proxying. The original code correctly used Promise.all for concurrent pipes. I posted a suggestion with the fix using the same Promise.all-equivalent pattern already used in Socket::close().

  2. [MEDIUM] The first pipeTo uses default options, so it closes this->writable when sock->readable ends, which could tear down the underlying connection before the second pipe runs.

github run

// Proxies to the other socket. Equivalent to:
// a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable);
// TODO: May want to add jsg::Optional<PipeToOptions> options?
jsg::Promise<void> proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this should be a method on Socket itself (which requires that we also add it to the pending standard draft (https://sockets-api.proposal.wintertc.org/). Instead, I think I'd rather prefer an API on the cloudflare:sockets module namespace like:

import { connect, proxy } from 'cloudflare:sockets';

// ...

export default {
  async connect(socket) {
    proxy(socket, connect('...'));
  }
}

Notice I also dropped the To off of that... Since the proxy is bidirectional, I'm not sure if proxyTo is the ideal name here when just proxy is descriptive enough.

Having proxyTo be a method on the Socket itself sets up a bit of an awkward question: if I have socketA or socketB, should I use socketA.proxyTo(socketB) or socketB.proxyTo(socketA)? But with a proxy export the semantic just becomes proxy(socketA, socketB) or proxy(socketB, socketA) where both are exactly equivalent and the order does not matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to add methods without standardizing them first. In fact that's how it's supposed to work -- somebody implements the new feature first, then if desired it gets standardized later.

That said I'm not necessarily against making proxy a freestanding function. I think it's less convenient but it's not a hill I'd die on.

I don't think the a.proxyTo(b) vs. b.proxyTo(a) thing is that confusing -- they are in fact equivalent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the spec also cover whatever methods are in the sockets module? I'd expect it to be added to the spec eventually too even if it isn't a method on Socket.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the spec also cover whatever methods are in the sockets module?

Not necessarily. The spec currently only covers the connect and Socket; helper/convenience functions are not necessarily in scope.

socket.readable.pipeTo(upstream.writable),
upstream.readable.pipeTo(socket.writable),
]);
await socket.proxyTo(upstream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern with the await will make this difficult to optimize around using deferred proxy. Or, more specifically, this ends up requiring that the entire proxy run with the isolate keep around for the lifetime of the pipeline which is not what we want.

If a worker is just connecting two sockets together and not doing anything else, we need to be able to make the connection at the kj streams level, allow the IoContext to go away while the proxy pipeline is still flowing, just like we would do if we had something like return fetch('whatever') in a regular handler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is arguably an advantage of the version of the connect handler that returns a socket instead of receiving it as a parameter, and if we can't figure out a way to make this work, we may have to switch back to that approach. Deferred proxying is very important here.

That said, I wonder if there's any way we can dig through the promise chain and discover if we're waiting on a promise that is actually just a KJ promise.

This could benefit the return fetch(req) use case too. Currently, we cannot start deferred proxying for HTTP until fetch() returns (i.e. response headers are received), because we have to await the fetch promise in JS land. Once we get a repsonse, then we can deferred-proxy its body. But it would be really nice if we could start deferred proxying much earlier, by figuring out that the promise we're waiting on is just a fetch promise and therefore we don't need to keep running JS.

Or another idea: Maybe proxyTo() just shouldn't return a promise. We could document that there's no way to wait for it to complete. The connection has been handed off to the runtime to proxy, and it'll push bytes until there's no more bytes to push, then close.

The latter idea is obviously easier to implement and seems pretty OK to me for now (though it would still be useful to explore promise-unwrapping in the future).

Copy link
Collaborator

@jasnell jasnell Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... that might be a bit tricky. Imagine a case like:

// In the fetch handler
const responsePromise = fetch('...');
responsePromise.then((response) => {
  // This is guaranteed to run before our internal handling of the response
  // and may have observable side effects
});
return responsePromise;

I think the only way for it to work would be also determining that there are no other reactions on the promise that could trigger observable side effects.

Maybe proxyTo() just shouldn't return a promise.

I'm leaning this direction also. So long as there's a way to optionally cancel the proxy (e.g. proxyTo(A, B, { signal: abortSignal }) then I don't actually think there's a really strong reason this MUST return a promise at all.

But then again, there's a certain simple elegance if we could just return the Socket we're wiring up and have the runtime set it up... But this raises a question: do we want to support cases like:

export default {
  fetch() {
    // Intentionally using fetch here, not connect
    const socketA = connect('...');
    const socketB = connect('...');
    socketA.proxyTo(socketB);
    // ...
  }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to be clear, I know that latter pattern is already possible with the current API... I just don't know if we want to encourage it by making it easier and more ergonomic. I don't have a an immediate good reason to say no, just being thorough in thinking through the new API

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants