Add Socket::proxyTo() API#6299
Conversation
This makes proxying sockets more convenient, should be especially useful for the connect handler.
|
|
||
| // Proxies to the other socket. Equivalent to: | ||
| // a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); | ||
| // TODO: May want to add jsg::Optional<PipeToOptions> options? |
There was a problem hiding this comment.
Need to think about this – one issue with taking options is that we can't apply them to both streams since the AbortSignal field can't be copied/shared between the two streams(?), having two options parameters instead seems highly inelegant.
There was a problem hiding this comment.
Why can't the single AbortSignal be used to abort both directions? I think that's what people would want.
That said, I don't think we necessarily need to support the options. People can always call pipeTo() manually if they want options.
There was a problem hiding this comment.
It should absolutely be just a single AbortSignal. Users should not have to pass two and there really should not be a reason to.
There was a problem hiding this comment.
And I agree, there is no reason to pass PipeToOptions. AbortSignal is really the only option in this case that matters.
There was a problem hiding this comment.
If we accept an AbortSignal it should still be part of an options struct.
If we're accepting an options struct then I think it might as well be PipeToOptions, and we might as well pass all the options into both pipeTo()s.
There was a problem hiding this comment.
I'm Bonk, and I've done a quick review of your PR.
Adds a Socket::proxyTo() convenience method that chains two pipeTo calls to proxy bidirectional traffic between sockets.
Issues found (ranked by severity):
- [HIGH]
proxyTo()runs pipes sequentially, not concurrently — breaks bidirectional proxying. - [MEDIUM] The first
pipeTo(with default options) closesthis->writableon completion, which may tear down the underlying TCP connection before the second pipe starts.
This review was generated by an AI assistant and may contain inaccuracies.
| jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) { | ||
| return sock->readable->pipeTo(js, writable.addRef(), {}) | ||
| .then(js, [this, sock = kj::mv(sock)](jsg::Lock& js) mutable { | ||
| return readable->pipeTo(js, sock->writable.addRef(), {}); | ||
| }); |
There was a problem hiding this comment.
The two pipes are chained sequentially: the second pipeTo (this→sock) doesn't start until the first (sock→this) fully completes. For a bidirectional TCP proxy, both directions need to flow concurrently — the original test code used Promise.all for exactly this reason.
Additionally, the first pipeTo uses default PipeToOptions, so when sock->readable ends, it will close this->writable. Depending on socket semantics, this could tear down the underlying connection before the second pipe runs.
The fix should run both pipes concurrently (like the original Promise.all pattern) and use preventClose/preventAbort/preventCancel as appropriate:
| jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) { | |
| return sock->readable->pipeTo(js, writable.addRef(), {}) | |
| .then(js, [this, sock = kj::mv(sock)](jsg::Lock& js) mutable { | |
| return readable->pipeTo(js, sock->writable.addRef(), {}); | |
| }); | |
| jsg::Promise<void> Socket::proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock) { | |
| auto pipeIn = sock->readable->pipeTo(js, writable.addRef(), {}); | |
| auto pipeOut = readable->pipeTo(js, sock->writable.addRef(), {}); | |
| return pipeIn.then(js, [pipeOut = kj::mv(pipeOut)](jsg::Lock& js) mutable { | |
| return kj::mv(pipeOut); | |
| }); | |
| } |
This is the same Promise.all-equivalent pattern used in Socket::close() (lines 306-309). You may also want to consider whether preventClose should be set on either or both pipes depending on the desired socket lifecycle semantics.
There was a problem hiding this comment.
Bonk is mostly right, but I'd suggest not using .then() at all and instead join the promises.
We could either use kj::joinPromises(), or kj::joinPromisesFailFast(). It's actually not entirely clear which is better here. FailFast means that if either direction throws an exception, we'd immediately cancel the opposite pump and the proxyTo() would fail out. With the non-failfast version, upon one direction failing, we'd still give the other direction a chance to finish.
I think failfast seems probably better but I'm a bit torn.
There was a problem hiding this comment.
Agreed. But just want to clarify, jsg::Promise (which is what is returned by pipeTo) does not support kj::joinPromises .. with @kentonv's suggestion here, we would essentially need to drop down to piping the underlying kj streams and not go through the web streams wrappers... which I think we likely want to do regardless so that the proxy pipeline runs much more efficiently.
There was a problem hiding this comment.
Ah good point.
So what you really want here is actually Promise.all but unfortunately V8 doesn't seem to give us a direct way to call that.
I don't think the code here needs to dig down to the KJ level. The two pipeTo() calls should automatically figure out, under the hood, whether they can delegate to KJ pumpTo() on underlying streams.
That said if we don't go directly to KJ, then we can't cancel one direction when the other direction fails. Maybe that's fine, I'm not sure we really needed to.
Still need to decide if we want to propagate errors immediately or only after both directions settle. Probably immediately, though it's a little weird that this means the other direction could still be pumping.
Note than Bonk's suggested code would eagerly propagate an error from pipeIn but not from pipeOut. That seems definitely wrong -- the two pumps should be symmetric.
|
|
||
| // Proxies to the other socket. Equivalent to: | ||
| // a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); | ||
| // TODO: May want to add jsg::Optional<PipeToOptions> options? |
There was a problem hiding this comment.
Nit: The doc comment says this is equivalent to a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); (which reads as concurrent), but the implementation chains them sequentially. If the implementation is intentionally sequential, the comment should say so; if concurrent, the implementation needs fixing (see comment on the .c++ file).
|
I'm Bonk, and I've done a quick review of your PR. This PR adds a
|
| // Proxies to the other socket. Equivalent to: | ||
| // a.readable.pipeTo(b.writable); b.readable.pipeTo(a.writable); | ||
| // TODO: May want to add jsg::Optional<PipeToOptions> options? | ||
| jsg::Promise<void> proxyTo(jsg::Lock& js, jsg::Ref<Socket> sock); |
There was a problem hiding this comment.
I'm not convinced this should be a method on Socket itself (which requires that we also add it to the pending standard draft (https://sockets-api.proposal.wintertc.org/). Instead, I think I'd rather prefer an API on the cloudflare:sockets module namespace like:
import { connect, proxy } from 'cloudflare:sockets';
// ...
export default {
async connect(socket) {
proxy(socket, connect('...'));
}
}Notice I also dropped the To off of that... Since the proxy is bidirectional, I'm not sure if proxyTo is the ideal name here when just proxy is descriptive enough.
Having proxyTo be a method on the Socket itself sets up a bit of an awkward question: if I have socketA or socketB, should I use socketA.proxyTo(socketB) or socketB.proxyTo(socketA)? But with a proxy export the semantic just becomes proxy(socketA, socketB) or proxy(socketB, socketA) where both are exactly equivalent and the order does not matter.
There was a problem hiding this comment.
I think it's OK to add methods without standardizing them first. In fact that's how it's supposed to work -- somebody implements the new feature first, then if desired it gets standardized later.
That said I'm not necessarily against making proxy a freestanding function. I think it's less convenient but it's not a hill I'd die on.
I don't think the a.proxyTo(b) vs. b.proxyTo(a) thing is that confusing -- they are in fact equivalent.
There was a problem hiding this comment.
Wouldn't the spec also cover whatever methods are in the sockets module? I'd expect it to be added to the spec eventually too even if it isn't a method on Socket.
There was a problem hiding this comment.
Wouldn't the spec also cover whatever methods are in the
socketsmodule?
Not necessarily. The spec currently only covers the connect and Socket; helper/convenience functions are not necessarily in scope.
| socket.readable.pipeTo(upstream.writable), | ||
| upstream.readable.pipeTo(socket.writable), | ||
| ]); | ||
| await socket.proxyTo(upstream); |
There was a problem hiding this comment.
This pattern with the await will make this difficult to optimize around using deferred proxy. Or, more specifically, this ends up requiring that the entire proxy run with the isolate keep around for the lifetime of the pipeline which is not what we want.
If a worker is just connecting two sockets together and not doing anything else, we need to be able to make the connection at the kj streams level, allow the IoContext to go away while the proxy pipeline is still flowing, just like we would do if we had something like return fetch('whatever') in a regular handler.
There was a problem hiding this comment.
Hmm, this is arguably an advantage of the version of the connect handler that returns a socket instead of receiving it as a parameter, and if we can't figure out a way to make this work, we may have to switch back to that approach. Deferred proxying is very important here.
That said, I wonder if there's any way we can dig through the promise chain and discover if we're waiting on a promise that is actually just a KJ promise.
This could benefit the return fetch(req) use case too. Currently, we cannot start deferred proxying for HTTP until fetch() returns (i.e. response headers are received), because we have to await the fetch promise in JS land. Once we get a repsonse, then we can deferred-proxy its body. But it would be really nice if we could start deferred proxying much earlier, by figuring out that the promise we're waiting on is just a fetch promise and therefore we don't need to keep running JS.
Or another idea: Maybe proxyTo() just shouldn't return a promise. We could document that there's no way to wait for it to complete. The connection has been handed off to the runtime to proxy, and it'll push bytes until there's no more bytes to push, then close.
The latter idea is obviously easier to implement and seems pretty OK to me for now (though it would still be useful to explore promise-unwrapping in the future).
There was a problem hiding this comment.
hmm... that might be a bit tricky. Imagine a case like:
// In the fetch handler
const responsePromise = fetch('...');
responsePromise.then((response) => {
// This is guaranteed to run before our internal handling of the response
// and may have observable side effects
});
return responsePromise;I think the only way for it to work would be also determining that there are no other reactions on the promise that could trigger observable side effects.
Maybe
proxyTo()just shouldn't return a promise.
I'm leaning this direction also. So long as there's a way to optionally cancel the proxy (e.g. proxyTo(A, B, { signal: abortSignal }) then I don't actually think there's a really strong reason this MUST return a promise at all.
But then again, there's a certain simple elegance if we could just return the Socket we're wiring up and have the runtime set it up... But this raises a question: do we want to support cases like:
export default {
fetch() {
// Intentionally using fetch here, not connect
const socketA = connect('...');
const socketB = connect('...');
socketA.proxyTo(socketB);
// ...
}
}There was a problem hiding this comment.
And to be clear, I know that latter pattern is already possible with the current API... I just don't know if we want to encourage it by making it easier and more ergonomic. I don't have a an immediate good reason to say no, just being thorough in thinking through the new API
This makes proxying sockets more convenient, should be especially useful for the connect handler.
Follow-up to #6059.