diff --git a/src/workerd/server/container-client-test.c++ b/src/workerd/server/container-client-test.c++ index 2e60addb3b7..846a7b1455b 100644 --- a/src/workerd/server/container-client-test.c++ +++ b/src/workerd/server/container-client-test.c++ @@ -13,11 +13,50 @@ #include "container-client.h" +#include #include namespace workerd::server { namespace { +void addDockerFrame(kj::Vector& frames, kj::byte streamId, kj::StringPtr payload) { + auto size = static_cast(payload.size()); + frames.add(streamId); + frames.add(0); + frames.add(0); + frames.add(0); + frames.add(static_cast((size >> 24) & 0xff)); + frames.add(static_cast((size >> 16) & 0xff)); + frames.add(static_cast((size >> 8) & 0xff)); + frames.add(static_cast(size & 0xff)); + frames.addAll(payload.asBytes()); +} + +kj::Array readDockerExecOutputInChunks( + kj::ArrayPtr input, size_t chunkSize) { + auto io = kj::setupAsyncIo(); + auto pipe = kj::newTwoWayPipe(); + auto stream = newDockerExecOutputStream(kj::mv(pipe.ends[0])); + auto writer = kj::mv(pipe.ends[1]); + auto& writerRef = *writer; + auto writeTask = writerRef.write(input) + .then([writer = kj::mv(writer)]() mutable { writer->shutdownWrite(); }) + .eagerlyEvaluate(nullptr); + + kj::Vector result; + auto chunk = kj::heapArray(chunkSize); + while (true) { + auto amount = stream->tryRead(chunk.begin(), 1, chunk.size()).wait(io.waitScope); + if (amount == 0) { + break; + } + result.addAll(chunk.asPtr().first(amount)); + } + + writeTask.wait(io.waitScope); + return result.releaseAsArray(); +} + // Regression test for VULN-127728: ContainerInspectResponse decode must not // use-after-free. With the old buggy code every field access after decode // dereferences freed heap memory (detectable by ASAN). @@ -195,5 +234,43 @@ KJ_TEST("ContainerCreateRequest encodes HostConfig Dns") { KJ_EXPECT(decodedDns[1] == "8.8.8.8"); } +KJ_TEST("newDockerExecOutputStream strips Docker attach frame headers") { + kj::Vector frames; + addDockerFrame(frames, 1, "he"_kj); + addDockerFrame(frames, 1, ""_kj); + addDockerFrame(frames, 2, "ignored stderr"_kj); + addDockerFrame(frames, 1, "llo"_kj); + addDockerFrame(frames, 1, " world"_kj); + + auto output = readDockerExecOutputInChunks(frames.asPtr(), 2); + KJ_EXPECT(kj::str(output.asChars()) == "hello world"); +} + +KJ_TEST("newDockerExecOutputStream handles clean EOF without stdout payload") { + kj::Vector frames; + addDockerFrame(frames, 2, "ignored stderr"_kj); + addDockerFrame(frames, 1, ""_kj); + + auto output = readDockerExecOutputInChunks(frames.asPtr(), 3); + KJ_EXPECT(output.size() == 0); +} + +KJ_TEST("newDockerExecOutputStream rejects truncated Docker attach frames") { + kj::Vector frames; + frames.add(1); + frames.add(0); + frames.add(0); + frames.add(0); + frames.add(0); + frames.add(0); + frames.add(0); + frames.add(5); + frames.addAll("he"_kj.asBytes()); + + KJ_EXPECT_THROW_MESSAGE( + "Docker exec raw stream ended in the middle of a frame", + readDockerExecOutputInChunks(frames.asPtr(), 2)); +} + } // namespace } // namespace workerd::server diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 69f2f0c6201..a92243af3ff 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -220,6 +220,129 @@ class BufferedAsyncIoStream final: public kj::AsyncIoStream { size_t bufferedOffset = 0; }; +} // namespace + +// Docker exec attach multiplexes stdout/stderr with 8-byte frame headers when TTY is disabled. +// This wrapper exposes the exec output as a raw byte stream while keeping stdin writes unframed. +class DockerExecOutputStreamImpl final: public kj::AsyncIoStream { + public: + explicit DockerExecOutputStreamImpl(kj::Own inner): inner(kj::mv(inner)) {} + + kj::Promise tryRead(void* dst, size_t minBytes, size_t maxBytes) override { + KJ_REQUIRE(minBytes <= maxBytes, minBytes, maxBytes); + + auto out = kj::arrayPtr(reinterpret_cast(dst), maxBytes); + size_t copied = 0; + + while (copied < minBytes) { + if (frameOffset == currentFrame.size()) { + auto hasFrame = co_await readNextStdoutFrame(); + if (!hasFrame) { + co_return copied; + } + } + + auto remaining = currentFrame.size() - frameOffset; + auto toCopy = kj::min(maxBytes - copied, remaining); + out.slice(copied, copied + toCopy).copyFrom(currentFrame.asPtr().slice( + frameOffset, frameOffset + toCopy)); + frameOffset += toCopy; + copied += toCopy; + + if (copied == maxBytes) { + co_return copied; + } + } + + co_return copied; + } + + kj::Promise write(kj::ArrayPtr buffer) override { + return inner->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return inner->write(pieces); + } + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override { + return inner->tryPumpFrom(input, amount); + } + kj::Promise whenWriteDisconnected() override { + return inner->whenWriteDisconnected(); + } + void abortWrite(kj::Exception&& exception) override { + inner->abortWrite(kj::mv(exception)); + } + + void shutdownWrite() override { + inner->shutdownWrite(); + } + void abortRead() override { + inner->abortRead(); + } + void getsockopt(int level, int option, void* value, kj::uint* length) override { + inner->getsockopt(level, option, value, length); + } + void setsockopt(int level, int option, const void* value, kj::uint length) override { + inner->setsockopt(level, option, value, length); + } + void getsockname(struct sockaddr* addr, kj::uint* length) override { + inner->getsockname(addr, length); + } + void getpeername(struct sockaddr* addr, kj::uint* length) override { + inner->getpeername(addr, length); + } + kj::Maybe getFd() const override { + return inner->getFd(); + } + + private: + kj::Promise readFully(kj::ArrayPtr dst) { + size_t offset = 0; + while (offset < dst.size()) { + auto read = co_await inner->tryRead(dst.begin() + offset, 1, dst.size() - offset); + if (read == 0) { + co_return false; + } + offset += read; + } + co_return true; + } + + kj::Promise readNextStdoutFrame() { + kj::byte header[8]; + while (co_await readFully(kj::arrayPtr(header, 8))) { + auto streamId = header[0]; + auto size = (static_cast(header[4]) << 24) | + (static_cast(header[5]) << 16) | + (static_cast(header[6]) << 8) | static_cast(header[7]); + auto payload = kj::heapArray(size); + if (size > 0) { + KJ_REQUIRE(co_await readFully(payload.asPtr()), + "Docker exec raw stream ended in the middle of a frame"); + } + + if (streamId == 1) { + currentFrame = kj::mv(payload); + frameOffset = 0; + co_return true; + } + } + + co_return false; + } + + kj::Own inner; + kj::Array currentFrame; + size_t frameOffset = 0; +}; + +kj::Own newDockerExecOutputStream(kj::Own inner) { + return kj::heap(kj::mv(inner)); +} + +namespace { + // Docker exec uses a single hijacked stream for stdin and stdout/stderr. Keep that stream in a // small refcounted holder so the returned stdin ByteStream and the output demux task can share it. class SharedExecConnection final: public kj::Refcounted { @@ -910,6 +1033,50 @@ kj::Maybe tryParsePublishedHostPort(capnp::json::Value::Reader portMap KJ_FAIL_REQUIRE("Malformed ContainerInspect port binding response: missing HostPort"); } +uint64_t parseTarOctalSize(kj::ArrayPtr header) { + KJ_REQUIRE(header.size() >= 136, "Malformed tar header"); + + uint64_t size = 0; + for (auto c: header.slice(124, 136)) { + if (c == 0 || c == ' ') { + continue; + } + KJ_REQUIRE(c >= '0' && c <= '7', "Malformed tar size"); + size = size * 8 + (c - '0'); + } + return size; +} + +kj::Array extractSingleFileFromTar(kj::Array tarBytes) { + constexpr size_t TAR_BLOCK_SIZE = 512; + KJ_REQUIRE(tarBytes.size() >= TAR_BLOCK_SIZE, "Malformed tar archive"); + + auto contentSize = parseTarOctalSize(tarBytes.asPtr().first(TAR_BLOCK_SIZE)); + KJ_REQUIRE(tarBytes.size() >= TAR_BLOCK_SIZE + contentSize, "Malformed tar archive"); + + auto result = kj::heapArray(contentSize); + result.asPtr().copyFrom(tarBytes.asPtr().slice(TAR_BLOCK_SIZE, TAR_BLOCK_SIZE + contentSize)); + return result; +} + +kj::String shellSingleQuote(kj::StringPtr value) { + kj::Vector result; + result.add('\''); + for (char c: value) { + if (c == '\'') { + result.add('\''); + result.add('\\'); + result.add('\''); + result.add('\''); + } else { + result.add(c); + } + } + result.add('\''); + result.add('\0'); + return kj::String(result.releaseAsArray()); +} + } // namespace // Represents a parsed egress mapping. IP/CIDR mappings match destination IPs, @@ -985,8 +1152,10 @@ ContainerClient::~ContainerClient() noexcept(false) { cleanupCallback(kj::joinPromises(kj::arr(kj::mv(sidecarCleanup), kj::mv(mainCleanup)))); } -// Docker-specific Port implementation that implements rpc::Container::Port::Server -// It does a HTTP CONNECT to the proxy-everything sidecar port. +// Docker-specific Port implementation that implements rpc::Container::Port::Server. +// In local Docker, host-originated traffic to the sidecar's published ingress port can be +// intercepted by proxy-everything's TPROXY rules, so port connections are bridged from inside the +// sidecar's network namespace instead. class ContainerClient::DockerPort final: public rpc::Container::Port::Server { public: DockerPort(ContainerClient& containerClient, kj::String containerHost, uint16_t containerPort) @@ -995,42 +1164,8 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { containerPort(containerPort) {} kj::Promise connect(ConnectContext context) override { - auto mappedPort = JSG_REQUIRE_NONNULL(containerClient.sidecarIngressHostPort, Error, - "connect(): Container ingress proxy is not running."); - - auto dstAddr = kj::str(containerHost, ":", containerPort); - - auto address = co_await containerClient.network.parseAddress(kj::str("127.0.0.1:", mappedPort)); - - kj::HttpHeaderTable::Builder headerTableBuilder; - auto xDstAddrHeader = headerTableBuilder.add("X-Dst-Addr"); - auto headerTable = headerTableBuilder.build(); - kj::HttpHeaders headers(*headerTable); - headers.set(xDstAddrHeader, kj::str(dstAddr)); - - auto proxyConnection = co_await address->connect(); - auto httpClient = kj::newHttpClient(*headerTable, *proxyConnection) - .attach(kj::mv(proxyConnection), kj::mv(headerTable)); - auto connectRequest = httpClient->connect(dstAddr, headers, {}); - auto status = co_await kj::mv(connectRequest.status); - - if (status.statusCode == 400) { - throw JSG_KJ_EXCEPTION( - DISCONNECTED, Error, "Container is not listening to port ", containerPort); - } - - if (status.statusCode < 200 || status.statusCode >= 300) { - KJ_IF_SOME(errorBody, status.errorBody) { - auto errorBodyText = co_await errorBody->readAllText(); - JSG_FAIL_REQUIRE(Error, "Connecting to container port through proxy-everything failed: [", - status.statusCode, "] ", status.statusText, " ", errorBodyText); - } - - JSG_FAIL_REQUIRE(Error, "Connecting to container port through proxy-everything failed: [", - status.statusCode, "] ", status.statusText); - } - - auto connection = kj::mv(connectRequest.connection); + auto connection = + co_await containerClient.connectToContainerPortFromSidecar(containerHost, containerPort); auto upPipe = kj::newOneWayPipe(); auto upEnd = kj::mv(upPipe.in); auto results = context.getResults(); @@ -1040,7 +1175,7 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { pumpTask = kj::joinPromisesFailFast(kj::arr(upEnd->pumpTo(*connection), connection->pumpTo(*downEnd))) .ignoreResult() - .attach(kj::mv(httpClient), kj::mv(upEnd), kj::mv(connection), kj::mv(downEnd)); + .attach(kj::mv(upEnd), kj::mv(connection), kj::mv(downEnd)); co_return; } @@ -1486,16 +1621,15 @@ static constexpr kj::StringPtr cloudflareCaFilename = "cloudflare/certs/cloudflare-containers-ca.crt"_kj; kj::Promise ContainerClient::readCACert() { - auto ingressPort = KJ_REQUIRE_NONNULL( - sidecarIngressHostPort, "Cannot read CA cert: sidecar ingress port not known"); - - auto response = co_await dockerApiRequest( - network, kj::str("127.0.0.1:", ingressPort), kj::HttpMethod::GET, kj::str("/ca")); + auto response = co_await dockerApiBinaryRequest(network, kj::str(dockerPath), kj::HttpMethod::GET, + kj::str("/containers/", sidecarContainerName, + "/archive?path=", kj::encodeUriComponent("/ca/ca.crt"_kj)), + kj::none, MAX_JSON_RESPONSE_SIZE); JSG_REQUIRE(response.statusCode == 200, Error, - "Failed to read CA cert from sidecar: ", response.statusCode, " ", response.body); + "Failed to read CA cert from sidecar: ", response.statusCode); - caCert = kj::mv(response.body); + caCert = kj::str(extractSingleFileFromTar(kj::mv(response.body)).asChars()); } kj::Promise ContainerClient::injectCACert() { @@ -1612,8 +1746,28 @@ kj::Promise> ContainerClient: }; } +kj::Promise ContainerClient::putSidecarEgressConfig(kj::String body) { + auto script = kj::str("body=", shellSingleQuote(body), + "\n" + "len=${#body}\n" + "response=$( { printf 'PUT /egress HTTP/1.1\\r\\nHost: localhost\\r\\n" + "Content-Type: application/json\\r\\nContent-Length: %s\\r\\n" + "Connection: close\\r\\n\\r\\n' \"$len\"; printf '%s' \"$body\"; } | " + "nc -w 5 127.0.0.1 ", + SIDECAR_INGRESS_PORT, + " )\n" + "case \"$response\" in HTTP/1.1\\ 2*|HTTP/1.0\\ 2*) exit 0;; " + "*) printf '%s\\n' \"$response\" >&2; exit 1;; esac\n"); + auto cmd = kj::arr(kj::str("sh"), kj::str("-c"), kj::mv(script)); + co_await runSimpleExecInContainer(sidecarContainerName, cmd.asPtr()); +} + kj::Promise ContainerClient::updateSidecarEgressPort( uint16_t ingressHostPort, uint16_t egressPort) { + // The published ingress port is intentionally not used for local Docker control requests because + // proxy-everything's TPROXY rules can intercept host-originated traffic to that published port. + (void)ingressHostPort; + capnp::JsonCodec codec; codec.handleByAnnotation(); capnp::MallocMessageBuilder message; @@ -1621,15 +1775,15 @@ kj::Promise ContainerClient::updateSidecarEgressPort( jsonRoot.setPort(egressPort); auto body = codec.encode(jsonRoot); - auto response = co_await dockerApiRequest(network, kj::str("127.0.0.1:", ingressHostPort), - kj::HttpMethod::PUT, kj::str("/egress"), kj::mv(body)); - - JSG_REQUIRE(response.statusCode >= 200 && response.statusCode < 300, Error, - "Updating sidecar egress port failed with: ", response.statusCode, " ", response.body); + co_await putSidecarEgressConfig(kj::mv(body)); } kj::Promise ContainerClient::updateSidecarEgressConfig( uint16_t ingressHostPort, uint16_t egressPort) { + // The published ingress port is intentionally not used for local Docker control requests because + // proxy-everything's TPROXY rules can intercept host-originated traffic to that published port. + (void)ingressHostPort; + capnp::JsonCodec codec; codec.handleByAnnotation(); capnp::MallocMessageBuilder message; @@ -1648,11 +1802,7 @@ kj::Promise ContainerClient::updateSidecarEgressConfig( } auto body = codec.encode(jsonRoot); - auto response = co_await dockerApiRequest(network, kj::str("127.0.0.1:", ingressHostPort), - kj::HttpMethod::PUT, kj::str("/egress"), kj::mv(body)); - - JSG_REQUIRE(response.statusCode >= 200 && response.statusCode < 300, Error, - "Updating sidecar egress config failed with: ", response.statusCode, " ", response.body); + co_await putSidecarEgressConfig(kj::mv(body)); } kj::Promise ContainerClient::createContainer(kj::StringPtr effectiveImage, @@ -1826,6 +1976,38 @@ kj::Promise> ContainerClient::startExec(kj::String ex co_return kj::mv(response.connection); } +kj::Promise> ContainerClient::connectToContainerPortFromSidecar( + kj::StringPtr containerHost, uint16_t containerPort) { + capnp::JsonCodec codec; + codec.handleByAnnotation(); + + capnp::MallocMessageBuilder createMessage; + auto createRequest = createMessage.initRoot(); + createRequest.setAttachStdin(true); + createRequest.setAttachStdout(true); + createRequest.setAttachStderr(false); + // Keep TTY disabled so Docker does not apply pseudo-terminal line discipline to HTTP bytes. + createRequest.setTty(false); + + auto cmd = createRequest.initCmd(3); + cmd.set(0, "nc"); + cmd.set(1, containerHost); + cmd.set(2, kj::str(containerPort)); + + auto createResponse = + co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", sidecarContainerName, "/exec"), codec.encode(createRequest)); + JSG_REQUIRE(createResponse.statusCode == 201, Error, + "Creating sidecar port Docker exec failed with [", createResponse.statusCode, "] ", + createResponse.body); + + auto parsedCreate = + decodeJsonResponse(createResponse.body); + auto execId = kj::str(parsedCreate->getRoot().getId()); + auto connection = co_await startExec(kj::mv(execId)); + co_return newDockerExecOutputStream(kj::mv(connection)); +} + kj::Promise ContainerClient::inspectExec( kj::StringPtr execId) { auto response = co_await dockerApiRequest( @@ -1845,6 +2027,11 @@ kj::Promise ContainerClient::inspectExec( } kj::Promise ContainerClient::runSimpleExec(kj::ArrayPtr cmd) { + co_await runSimpleExecInContainer(containerName, cmd); +} + +kj::Promise ContainerClient::runSimpleExecInContainer( + kj::StringPtr targetContainerName, kj::ArrayPtr cmd) { capnp::JsonCodec codec; codec.handleByAnnotation(); @@ -1862,7 +2049,7 @@ kj::Promise ContainerClient::runSimpleExec(kj::ArrayPtr auto createResponse = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, - kj::str("/containers/", containerName, "/exec"), codec.encode(createRequest)); + kj::str("/containers/", targetContainerName, "/exec"), codec.encode(createRequest)); JSG_REQUIRE(createResponse.statusCode == 201, Error, "Creating helper Docker exec failed with [", createResponse.statusCode, "] ", createResponse.body); diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index 1cc6c244bd2..af187876a60 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -48,6 +48,9 @@ kj::Own decodeJsonResponse(kj::StringPtr response) return message; } +// Wraps Docker exec's TTY-disabled attach stream and exposes stdout frames as a raw stream. +kj::Own newDockerExecOutputStream(kj::Own inner); + // Docker-based implementation that implements the rpc::Container::Server interface // so it can be used as a rpc::Container::Client via kj::heap(). // This allows the Container JSG class to use Docker directly without knowing @@ -163,6 +166,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise> inspectContainer(); + kj::Promise putSidecarEgressConfig(kj::String body); kj::Promise updateSidecarEgressPort(uint16_t ingressHostPort, uint16_t egressPort); kj::Promise updateSidecarEgressConfig(uint16_t ingressHostPort, uint16_t egressPort); kj::Promise createContainer(kj::StringPtr effectiveImage, @@ -175,8 +179,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte bool attachStdout, bool attachStderr); kj::Promise> startExec(kj::String execId); + kj::Promise> connectToContainerPortFromSidecar( + kj::StringPtr containerHost, uint16_t containerPort); kj::Promise inspectExec(kj::StringPtr execId); kj::Promise runSimpleExec(kj::ArrayPtr cmd); + kj::Promise runSimpleExecInContainer( + kj::StringPtr targetContainerName, kj::ArrayPtr cmd); kj::Promise startContainer(); kj::Promise stopContainer(); kj::Promise killContainer(uint32_t signal);