diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..634eba2 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,10 @@ +[build] +#rustc-wrapper = "sccache" +# Unlocks reqwest's experimental HTTP/3 client, used only by tuic-tests' +# `h3-masquerade-test` feature. Inert unless reqwest is built with its `http3` +# feature (which only that opt-in test feature enables), so default builds are +# unaffected — no extra deps, no http3 code. +rustflags = ["--cfg", "reqwest_unstable"] + +[env] +RUSTC_BOOTSTRAP = { value = "1" } diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04788d8..de796df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,11 @@ jobs: rust-toolchain: "stable" packages: "wind,tuic-server,tuic-client" target-config-file: ".github/target.toml" - rustflags: "" + # The reusable workflow exports this as the `RUSTFLAGS` env var, which + # overrides `.cargo/config.toml [build] rustflags`; mirror the committed + # `--cfg reqwest_unstable` here so it stays enabled in CI (inert unless a + # crate also turns on reqwest's `http3` feature). + rustflags: "--cfg reqwest_unstable" enable-tmate: false only-clippy-tests-on-pr: false # GitHub caps the per-repo Actions cache at 10 GB and each matrix target diff --git a/.gitignore b/.gitignore index 2f66e01..5100169 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ target/ **/*.rs.bk /target config.toml +# ...but the workspace's `.cargo/config.toml` IS tracked: it carries +# `RUSTC_BOOTSTRAP=1` (needed for the repo's nightly rustfmt.toml options) and the +# `--cfg reqwest_unstable` flag that the `h3-masquerade-test` feature relies on. +!.cargo/config.toml .claude \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 6ddbafe..bec4cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10872b55cfb02a821b69dc7cf8dc6a71d6af25eb9a79662bec4a9d016056b3be" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-quinn" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2e732c8d91a74731663ac8479ab505042fbf547b9a207213ab7fbcbfc4f8b4" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn 0.11.9", + "tokio", + "tokio-util", +] + [[package]] name = "half" version = "2.7.1" @@ -3402,6 +3430,7 @@ checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", "cfg_aliases", + "futures-io", "pin-project-lite", "quinn-proto 0.11.14", "quinn-udp 0.5.14", @@ -3754,6 +3783,10 @@ dependencies = [ "base64", "bytes", "futures-core", + "futures-util", + "h2", + "h3", + "h3-quinn", "http", "http-body", "http-body-util", @@ -3773,12 +3806,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.7", ] @@ -5350,10 +5385,12 @@ dependencies = [ "bytes", "eyre", "fast-socks5", + "http", "json5 1.3.1", "lexopt", "quinn 0.12.0", "rcgen 0.14.8", + "reqwest", "rustls", "serial_test", "tokio", @@ -5605,6 +5642,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" @@ -5817,6 +5867,7 @@ dependencies = [ "eyre", "foreign-types-shared", "futures-util", + "h3", "pin-project", "quinn 0.12.0", "quinn-congestions", @@ -5889,12 +5940,15 @@ dependencies = [ "enum_dispatch", "eyre", "futures-util", + "h3", + "http", "moka", "pin-project", "portable-atomic", "quinn 0.12.0", "quinn-congestions", "rcgen 0.14.8", + "reqwest", "rustls", "rustls-platform-verifier", "secrecy", diff --git a/crates/tuic-server/src/config.rs b/crates/tuic-server/src/config.rs index ba490e5..4118345 100644 --- a/crates/tuic-server/src/config.rs +++ b/crates/tuic-server/src/config.rs @@ -87,6 +87,12 @@ pub struct Config { pub users: HashMap, pub tls: TlsConfig, + /// HTTP/3 masquerade: reverse-proxy non-TUIC (HTTP/3 probe) connections to + /// a real upstream site so the server is indistinguishable from a web + /// server. + #[serde(default)] + pub masquerade: MasqueradeConfig, + #[educe(Default = "")] pub data_dir: PathBuf, @@ -257,6 +263,23 @@ pub struct TlsConfig { pub acme_staging: bool, } +/// HTTP/3 masquerade configuration. +/// +/// When `enabled`, a connection that isn't TUIC (its first stream byte isn't +/// the TUIC version `0x05` — i.e. an active prober speaking real HTTP/3) is +/// served as a reverse proxy to `upstream`, so the server is indistinguishable +/// from a normal HTTP/3 website instead of resetting the connection. +#[derive(Deserialize, Serialize, Educe)] +#[educe(Default)] +#[serde(default, deny_unknown_fields)] +pub struct MasqueradeConfig { + #[educe(Default(expression = false))] + pub enabled: bool, + /// Upstream site to reverse-proxy to, e.g. `https://example.com`. + #[educe(Default(expression = "https://example.com"))] + pub upstream: String, +} + /// Transport tuning for the quinn backend (`wind-tuic`). #[derive(Deserialize, Serialize, Educe)] #[educe(Default)] diff --git a/crates/tuic-server/src/wind_adapter.rs b/crates/tuic-server/src/wind_adapter.rs index 0f793ba..6421da5 100644 --- a/crates/tuic-server/src/wind_adapter.rs +++ b/crates/tuic-server/src/wind_adapter.rs @@ -284,6 +284,9 @@ async fn create_quinn_inbound(ctx: &Arc) -> eyre::Result) -> eyre::Result SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + while let Ok((mut sock, _)) = listener.accept().await { + tokio::spawn(async move { + // A probe GET has no body, so a single read drains the request line + // + headers; we don't need to parse it. + let mut buf = [0u8; 8192]; + let _ = sock.read(&mut buf).await; + let resp = format!( + "HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + UPSTREAM_BODY.len(), + UPSTREAM_BODY + ); + let _ = sock.write_all(resp.as_bytes()).await; + let _ = sock.shutdown().await; + }); + } + }); + addr +} + +#[tokio::test(flavor = "multi_thread")] +async fn masquerade_reverse_proxies_http3_probes() -> eyre::Result<()> { + install_crypto_provider(); + + let upstream = start_upstream().await; + + // Fixed port (matches the repo's other e2e tests) so we can form the client + // URL before the server reports its address. + let server_addr: SocketAddr = "127.0.0.1:8471".parse().unwrap(); + let uuid = Uuid::new_v4(); + + let cfg = tuic_server::Config { + log_level: tuic_server::config::LogLevel::Debug, + server: server_addr, + users: { + let mut users = HashMap::new(); + users.insert(uuid, "pw".to_string()); + users + }, + tls: tuic_server::config::TlsConfig { + self_sign: true, + hostname: "localhost".to_string(), + // The server must advertise the `h3` ALPN for an HTTP/3 client to + // negotiate at all — this is also what TUIC uses to disguise itself. + alpn: vec!["h3".to_string()], + ..Default::default() + }, + masquerade: tuic_server::config::MasqueradeConfig { + enabled: true, + upstream: format!("http://{upstream}"), + }, + data_dir: std::env::temp_dir().join("wind-masquerade-test"), + experimental: tuic_server::config::ExperimentalConfig { + drop_loopback: false, + drop_private: false, + }, + ..Default::default() + }; + + // `run` blocks forever on success; bound it so a hung server can't wedge the + // test runner, and treat the safety-timeout as "still serving". + let mut server = tokio::spawn(async move { + match timeout(Duration::from_secs(20), tuic_server::run(cfg)).await { + Ok(res) => res, + Err(_) => Ok(()), + } + }); + + // Wait for the server to bind. If it instead exits early (e.g. the fixed port + // is already in use), surface that real error now rather than letting the + // HTTP/3 request below fail with an opaque 10s timeout. + tokio::select! { + joined = &mut server => match joined { + Ok(Ok(())) => eyre::bail!("tuic-server exited before serving any request"), + Ok(Err(e)) => eyre::bail!("tuic-server failed to start: {e:?}"), + Err(e) => eyre::bail!("tuic-server task panicked: {e}"), + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => {} + } + + // reqwest as a real HTTP/3 prober. `danger_accept_invalid_certs` because the + // server uses a self-signed cert; `http3_prior_knowledge` forces h3. + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .http3_prior_knowledge() + .build()?; + + let url = format!("https://{server_addr}/some/secret/path?probe=1"); + let res = timeout( + Duration::from_secs(10), + client.get(&url).version(http::Version::HTTP_3).send(), + ) + .await + .map_err(|_| eyre::eyre!("HTTP/3 request to the masquerade timed out"))??; + + assert_eq!(res.version(), http::Version::HTTP_3, "response must be HTTP/3"); + assert_eq!(res.status(), 200, "masquerade should return the upstream's 200"); + let body = res.text().await?; + assert_eq!(body, UPSTREAM_BODY, "masquerade must relay the upstream body"); + + Ok(()) +} diff --git a/crates/wind-quic/Cargo.toml b/crates/wind-quic/Cargo.toml index 7bc0d89..1c70586 100644 --- a/crates/wind-quic/Cargo.toml +++ b/crates/wind-quic/Cargo.toml @@ -35,6 +35,11 @@ quiche = [ "dep:arc-swap", ] +# HTTP/3 masquerade adapter: an `h3::quic` server surface over `QuicConnection`, +# used by the TUIC server to pose as a real HTTP/3 web server for non-TUIC +# clients. Backend-agnostic, so it works over either the quinn or quiche backend. +h3 = ["dep:h3"] + [dependencies] wind-core = { version = "0.1.1", path = "../wind-core", default-features = false } @@ -58,6 +63,9 @@ rustls-pemfile = { version = "2", optional = true } rustls-platform-verifier = { version = "0.7", default-features = false, optional = true } aws-lc-rs = { version = "*", default-features = false, optional = true } +# HTTP/3 (masquerade) — transport-agnostic h3 over the `QuicConnection` adapter. +h3 = { version = "0.0.8", optional = true } + # QUIC with quiche / tokio-quiche tokio-quiche = { version = "0.19", optional = true } boring = { version = "4", default-features = false, optional = true } diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs new file mode 100644 index 0000000..79d21d1 --- /dev/null +++ b/crates/wind-quic/src/h3_adapter.rs @@ -0,0 +1,427 @@ +//! An [`h3::quic`] server surface implemented over [`QuicConnection`]. +//! +//! The TUIC server poses as a real HTTP/3 web server for clients that speak +//! actual HTTP/3 instead of TUIC (see the masquerade in `wind-tuic`). Rather +//! than bind to a specific QUIC engine, this adapter implements the hyperium +//! [`h3`] crate's transport traits over our backend-neutral +//! [`QuicConnection`], so the same HTTP/3 server runs over either the quinn or +//! quiche backend. +//! +//! Only the **server** surface is implemented: accepting peer-initiated uni +//! (control / QPACK) and bidi (request) streams, and opening our own uni +//! streams (control / QPACK). The per-stream classifier in `wind-tuic` reads +//! each accepted stream's prefix, and feeds the ones it classified as h3 into +//! this adapter over two channels (`recv_rx` / `bidi_rx`) — already accepted +//! off the connection, with their peeked prefix replayed via +//! [`PrefixedRecv`](crate::PrefixedRecv). So the adapter just pulls streams +//! from the channels; there is no "first stream" special case. Every recv +//! stream is a `PrefixedRecv`, so the adapter is generic over the backend's +//! concrete stream types — no boxing or dynamic dispatch. +//! +//! The bridge is mechanical: our streams are `AsyncRead`/`AsyncWrite`, while +//! `h3::quic` is poll- and `Buf`-based. Recv streams read into a scratch buffer +//! and hand back `Bytes`; send streams buffer one `WriteBuf` and drain it +//! through `poll_write`. Our `QuicConnection` accept/open methods are `async +//! fn`, so each is driven as a boxed in-flight future stored on the +//! connection/opener. + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::{Buf, Bytes}; +use h3::quic::{ + BidiStream, Connection, ConnectionErrorIncoming, OpenStreams, RecvStream, SendStream, StreamErrorIncoming, StreamId, + WriteBuf, +}; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + sync::mpsc, +}; + +use crate::{PrefixedRecv, QuicConnection, QuicError, QuicRecvStream, QuicSendStream}; + +/// Scratch buffer size for a single `poll_data` read. +const RECV_CHUNK: usize = 16 * 1024; + +type BoxFut = Pin + Send>>; + +/// A boxed in-flight `open_bi` future. Aliased because the bidi case returns a +/// `(SendStream, RecvStream)` tuple, which trips clippy's `type_complexity` +/// lint when written inline in every slot/signature. +type BoxBiFut = BoxFut::SendStream, ::RecvStream), QuicError>>; + +/// Channel of accepted (prefix-replayed) recv streams the `wind-tuic` +/// per-stream router feeds to the masquerade h3 server. +type RecvRx = mpsc::UnboundedReceiver::RecvStream>>; +/// Channel of accepted bidi (request) streams — `(send half, prefix-replayed +/// recv)`. +type BidiRx = mpsc::UnboundedReceiver<( + ::SendStream, + PrefixedRecv<::RecvStream>, +)>; + +/// Build an HTTP/3 server connection over `conn`. +/// +/// Accepted streams are pulled from `recv_rx` / `bidi_rx`, which the +/// `wind-tuic` per-stream router fills with the streams it classified as h3 +/// (already accepted off the connection, with their peeked prefix replayed via +/// [`PrefixedRecv`]). `conn` is kept only for *opening* the server's own +/// control / QPACK streams. There is no "first stream" special case — every +/// accepted stream arrives the same way. +pub fn server_connection(conn: C, recv_rx: RecvRx, bidi_rx: BidiRx) -> H3Conn { + H3Conn { + conn, + recv_rx, + bidi_rx, + open_uni_fut: None, + open_bi_fut: None, + } +} + +fn conn_err(e: QuicError) -> ConnectionErrorIncoming { + match e { + QuicError::TimedOut => ConnectionErrorIncoming::Timeout, + QuicError::ApplicationClosed { .. } | QuicError::LocallyClosed => { + ConnectionErrorIncoming::ApplicationClose { error_code: 0 } + } + other => ConnectionErrorIncoming::InternalError(other.to_string()), + } +} + +fn stream_err(e: QuicError) -> StreamErrorIncoming { + StreamErrorIncoming::ConnectionErrorIncoming { + connection_error: conn_err(e), + } +} + +// --------------------------------------------------------------------------- +// Recv stream — `PrefixedRecv` *is* the adapter's recv stream +// --------------------------------------------------------------------------- + +/// [`PrefixedRecv`] doubles as the adapter's `h3::quic::RecvStream`: it already +/// owns the (possibly empty) replayed prefix plus the backend recv stream, so +/// no separate wrapper type is needed. Fresh accepted streams are wrapped with +/// an empty prefix. +impl RecvStream for PrefixedRecv { + type Buf = Bytes; + + fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { + let mut scratch = [0u8; RECV_CHUNK]; + let mut rb = ReadBuf::new(&mut scratch); + match Pin::new(&mut *self).poll_read(cx, &mut rb) { + Poll::Ready(Ok(())) => { + let filled = rb.filled(); + if filled.is_empty() { + // Clean EOF (peer FIN). + Poll::Ready(Ok(None)) + } else { + Poll::Ready(Ok(Some(Bytes::copy_from_slice(filled)))) + } + } + Poll::Ready(Err(e)) => Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => Poll::Pending, + } + } + + fn stop_sending(&mut self, error_code: u64) { + self.stop(error_code); + } + + fn recv_id(&self) -> StreamId { + stream_id(self.id()) + } +} + +// --------------------------------------------------------------------------- +// Send stream +// --------------------------------------------------------------------------- + +/// `h3::quic::SendStream` over the backend's send stream. +pub struct H3Send { + inner: C::SendStream, + id: u64, + /// At most one `WriteBuf` is buffered at a time; `poll_ready`/`poll_finish` + /// drain it through the underlying `AsyncWrite`. + pending: Option>, +} + +impl H3Send { + fn new(inner: C::SendStream) -> Self { + let id = inner.id(); + Self { + inner, + id, + pending: None, + } + } + + /// Flush the buffered `WriteBuf` (if any) to the underlying stream. Returns + /// `Ready(Ok)` once nothing is buffered. + fn poll_flush_pending(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(buf) = self.pending.as_mut() { + while buf.has_remaining() { + let chunk = buf.chunk(); + match Pin::new(&mut self.inner).poll_write(cx, chunk) { + Poll::Ready(Ok(0)) => { + return Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "h3 send stream wrote zero bytes", + ))))); + } + Poll::Ready(Ok(n)) => buf.advance(n), + Poll::Ready(Err(e)) => return Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => return Poll::Pending, + } + } + } + self.pending = None; + Poll::Ready(Ok(())) + } +} + +impl SendStream for H3Send { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_flush_pending(cx) + } + + fn send_data>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> { + // h3 always polls `poll_ready` to readiness before `send_data`, so the + // previous buffer has drained. + self.pending = Some(data.into()); + Ok(()) + } + + fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.poll_flush_pending(cx) { + Poll::Ready(Ok(())) => match Pin::new(&mut self.inner).poll_flush(cx) { + Poll::Ready(Ok(())) => { + let _ = self.inner.finish(); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => Poll::Pending, + }, + other => other, + } + } + + fn reset(&mut self, reset_code: u64) { + self.inner.reset(reset_code); + } + + fn send_id(&self) -> StreamId { + stream_id(self.id) + } +} + +// --------------------------------------------------------------------------- +// Bidi stream (request streams) +// --------------------------------------------------------------------------- + +/// `h3::quic::BidiStream` joining an [`H3Send`] and a [`PrefixedRecv`]. +pub struct H3Bidi { + send: H3Send, + recv: PrefixedRecv, +} + +impl SendStream for H3Bidi { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.send.poll_ready(cx) + } + + fn send_data>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> { + self.send.send_data(data) + } + + fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { + self.send.poll_finish(cx) + } + + fn reset(&mut self, reset_code: u64) { + self.send.reset(reset_code); + } + + fn send_id(&self) -> StreamId { + self.send.send_id() + } +} + +impl RecvStream for H3Bidi { + type Buf = Bytes; + + fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { + self.recv.poll_data(cx) + } + + fn stop_sending(&mut self, error_code: u64) { + self.recv.stop_sending(error_code); + } + + fn recv_id(&self) -> StreamId { + self.recv.recv_id() + } +} + +impl BidiStream for H3Bidi { + type RecvStream = PrefixedRecv; + type SendStream = H3Send; + + fn split(self) -> (Self::SendStream, Self::RecvStream) { + (self.send, self.recv) + } +} + +fn into_bidi((send, recv): (C::SendStream, C::RecvStream)) -> H3Bidi { + H3Bidi { + send: H3Send::new(send), + recv: PrefixedRecv::new(Bytes::new(), recv), + } +} + +// --------------------------------------------------------------------------- +// Opener +// --------------------------------------------------------------------------- + +/// Opens local uni/bidi streams (HTTP/3 control + QPACK streams). Produced by +/// [`Connection::opener`]. +pub struct H3Opener { + conn: C, + open_uni_fut: Option>>, + open_bi_fut: Option>, +} + +impl OpenStreams for H3Opener { + type BidiStream = H3Bidi; + type SendStream = H3Send; + + fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) + } + + fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_send(&self.conn, &mut self.open_uni_fut, cx) + } + + fn close(&mut self, _code: h3::error::Code, reason: &[u8]) { + self.conn.close(0, reason); + } +} + +// --------------------------------------------------------------------------- +// Connection +// --------------------------------------------------------------------------- + +/// `h3::quic::Connection` over a [`QuicConnection`] handle. Accepts streams +/// from the router's channels; opens streams directly on `conn`. +pub struct H3Conn { + conn: C, + recv_rx: RecvRx, + bidi_rx: BidiRx, + open_uni_fut: Option>>, + open_bi_fut: Option>, +} + +impl OpenStreams for H3Conn { + type BidiStream = H3Bidi; + type SendStream = H3Send; + + fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) + } + + fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_send(&self.conn, &mut self.open_uni_fut, cx) + } + + fn close(&mut self, _code: h3::error::Code, reason: &[u8]) { + self.conn.close(0, reason); + } +} + +impl Connection for H3Conn { + type OpenStreams = H3Opener; + type RecvStream = PrefixedRecv; + + fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.recv_rx.poll_recv(cx) { + Poll::Ready(Some(recv)) => Poll::Ready(Ok(recv)), + // Channel closed → the router (and the connection) is gone. + Poll::Ready(None) => Poll::Ready(Err(ConnectionErrorIncoming::Timeout)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.bidi_rx.poll_recv(cx) { + Poll::Ready(Some((send, recv))) => Poll::Ready(Ok(H3Bidi { + send: H3Send::new(send), + recv, + })), + Poll::Ready(None) => Poll::Ready(Err(ConnectionErrorIncoming::Timeout)), + Poll::Pending => Poll::Pending, + } + } + + fn opener(&self) -> Self::OpenStreams { + H3Opener { + conn: self.conn.clone(), + open_uni_fut: None, + open_bi_fut: None, + } + } +} + +// --------------------------------------------------------------------------- +// Shared poll helpers +// --------------------------------------------------------------------------- + +fn stream_id(id: u64) -> StreamId { + // QUIC stream ids fit the h3 `StreamId` invariant (< 2^62); fall back to 0 + // only if a backend ever surfaces something out of range. + StreamId::try_from(id).unwrap_or_else(|_| StreamId::try_from(0).expect("0 is a valid stream id")) +} + +fn poll_open_send( + conn: &C, + slot: &mut Option>>, + cx: &mut Context<'_>, +) -> Poll, StreamErrorIncoming>> { + let poll = { + let fut = slot.get_or_insert_with(|| { + let conn = conn.clone(); + Box::pin(async move { conn.open_uni().await }) + }); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + *slot = None; + Poll::Ready(res.map(H3Send::new).map_err(stream_err)) + } + Poll::Pending => Poll::Pending, + } +} + +fn poll_open_bidi( + conn: &C, + slot: &mut Option>, + cx: &mut Context<'_>, +) -> Poll, StreamErrorIncoming>> { + let poll = { + let fut = slot.get_or_insert_with(|| { + let conn = conn.clone(); + Box::pin(async move { conn.open_bi().await }) + }); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + *slot = None; + Poll::Ready(res.map(into_bidi).map_err(stream_err)) + } + Poll::Pending => Poll::Pending, + } +} diff --git a/crates/wind-quic/src/lib.rs b/crates/wind-quic/src/lib.rs index a688aa2..e701232 100644 --- a/crates/wind-quic/src/lib.rs +++ b/crates/wind-quic/src/lib.rs @@ -33,11 +33,17 @@ pub mod config; pub mod error; +pub mod prefixed; pub mod traits; pub use config::{CertSource, ClientTlsConfig, ServerTlsConfig, TransportConfig}; pub use error::{QuicError, Result}; +pub use prefixed::PrefixedRecv; pub use traits::{QuicConnection, QuicRecvStream, QuicSendStream}; + +/// HTTP/3 `h3::quic` adapter over [`QuicConnection`] (masquerade support). +#[cfg(feature = "h3")] +pub mod h3_adapter; // Re-export the shared congestion-control selector so consumers configure the // transport without depending on `wind-core` directly. pub use wind_core::quic::QuicCongestionControl; diff --git a/crates/wind-quic/src/prefixed.rs b/crates/wind-quic/src/prefixed.rs new file mode 100644 index 0000000..a6214e0 --- /dev/null +++ b/crates/wind-quic/src/prefixed.rs @@ -0,0 +1,67 @@ +//! A recv stream that replays a small buffered prefix before its inner stream. +//! +//! The TUIC server multiplexes the real TUIC protocol and an HTTP/3 masquerade +//! on the same QUIC port (both negotiate the `h3` ALPN). To classify a +//! connection it peeks the first byte(s) of the first stream; [`PrefixedRecv`] +//! then lets that consumed prefix be re-read transparently, whether the stream +//! is handed to the TUIC header parser or to the HTTP/3 adapter. + +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use tokio::io::{AsyncRead, ReadBuf}; + +use crate::traits::QuicRecvStream; + +/// Wraps a recv stream so a buffered `prefix` is yielded before the inner +/// stream's own data. Once the prefix is drained, reads delegate straight to +/// the inner stream. +pub struct PrefixedRecv { + prefix: Bytes, + inner: R, +} + +impl PrefixedRecv { + /// Build a `PrefixedRecv` that replays `prefix`, then `inner`. + pub fn new(prefix: impl Into, inner: R) -> Self { + Self { + prefix: prefix.into(), + inner, + } + } + + /// Consume the wrapper, returning the inner stream. The (possibly partial) + /// unread prefix is returned alongside so callers don't silently drop it. + pub fn into_parts(self) -> (Bytes, R) { + (self.prefix, self.inner) + } +} + +impl AsyncRead for PrefixedRecv { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let this = self.get_mut(); + if !this.prefix.is_empty() { + let n = this.prefix.len().min(buf.remaining()); + if n > 0 { + let chunk = this.prefix.split_to(n); + buf.put_slice(&chunk); + } + return Poll::Ready(Ok(())); + } + Pin::new(&mut this.inner).poll_read(cx, buf) + } +} + +impl QuicRecvStream for PrefixedRecv { + fn stop(&mut self, code: u64) { + self.inner.stop(code); + } + + fn id(&self) -> u64 { + self.inner.id() + } +} diff --git a/crates/wind-quic/src/quiche/stream.rs b/crates/wind-quic/src/quiche/stream.rs index 1568efb..b57a3e9 100644 --- a/crates/wind-quic/src/quiche/stream.rs +++ b/crates/wind-quic/src/quiche/stream.rs @@ -100,6 +100,10 @@ impl QuicSendStream for QuicheSend { self.tx.close(); self.finished = true; } + + fn id(&self) -> u64 { + self.sid + } } /// Recv half of a quiche stream. @@ -148,4 +152,8 @@ impl QuicRecvStream for QuicheRecv { code, }); } + + fn id(&self) -> u64 { + self.sid + } } diff --git a/crates/wind-quic/src/quinn/mod.rs b/crates/wind-quic/src/quinn/mod.rs index ad2fc6a..05eb00d 100644 --- a/crates/wind-quic/src/quinn/mod.rs +++ b/crates/wind-quic/src/quinn/mod.rs @@ -62,6 +62,10 @@ impl QuicSendStream for QuinnSend { fn reset(&mut self, code: u64) { let _ = self.0.reset(VarInt::from_u64(code).unwrap_or(VarInt::MAX)); } + + fn id(&self) -> u64 { + self.0.id().into() + } } impl AsyncRead for QuinnRecv { @@ -74,6 +78,10 @@ impl QuicRecvStream for QuinnRecv { fn stop(&mut self, code: u64) { let _ = self.0.stop(VarInt::from_u64(code).unwrap_or(VarInt::MAX)); } + + fn id(&self) -> u64 { + self.0.id().into() + } } // --------------------------------------------------------------------------- diff --git a/crates/wind-quic/src/traits.rs b/crates/wind-quic/src/traits.rs index 78dc8de..a0c503a 100644 --- a/crates/wind-quic/src/traits.rs +++ b/crates/wind-quic/src/traits.rs @@ -24,12 +24,21 @@ pub trait QuicSendStream: AsyncWrite + Unpin + Send + 'static { /// Abruptly reset the stream with `code`, discarding unsent data. fn reset(&mut self, code: u64); + + /// The QUIC stream id of this send half. + /// + /// Needed by protocol layers (e.g. the HTTP/3 masquerade's `h3::quic` + /// adapter) that must report stream ids to the peer. + fn id(&self) -> u64; } /// The receive half of a QUIC stream. pub trait QuicRecvStream: AsyncRead + Unpin + Send + 'static { /// Ask the peer to stop sending, with error `code`. fn stop(&mut self, code: u64); + + /// The QUIC stream id of this recv half. + fn id(&self) -> u64; } /// A cheaply-cloneable handle to an established QUIC connection. diff --git a/crates/wind-tuic/Cargo.toml b/crates/wind-tuic/Cargo.toml index a5bf2c9..ba0191b 100644 --- a/crates/wind-tuic/Cargo.toml +++ b/crates/wind-tuic/Cargo.toml @@ -7,7 +7,7 @@ description.workspace = true license = "MIT OR Apache-2.0" [features] -default = ["server", "client", "quinn", "aws-lc-rs"] +default = ["server", "client", "quinn", "aws-lc-rs", "masquerade"] decode = ["tuic-core/decode"] encode = ["tuic-core/encode"] # The server decodes client requests AND encodes UDP response datagrams, so it @@ -15,6 +15,19 @@ encode = ["tuic-core/encode"] server = ["decode", "encode"] client = ["encode"] +# HTTP/3 masquerade: when a connecting client speaks real HTTP/3 instead of TUIC, +# pose as a genuine HTTP/3 web server by reverse-proxying to a configured upstream +# site. Backend-agnostic via the `wind-quic` h3 adapter, so it covers both the +# quinn and quiche backends. The runtime `[masquerade].enabled` flag gates it; the +# feature only compiles the code + deps in. +masquerade = [ + "server", + "wind-quic/h3", + "dep:h3", + "dep:http", + "dep:reqwest", +] + # Quinn Backend quinn = [ "wind-quic/quinn", @@ -78,6 +91,13 @@ rustls = { version = "0.23", default-features = false, optional = true } rustls-platform-verifier = { version = "0.7", default-features = false, optional = true } aws-lc-rs = { version = "*", optional = true, default-features = false } +# HTTP/3 masquerade: the transport-agnostic h3 server engine plus a `reqwest` +# reverse-proxy client to the upstream site (reqwest owns TLS/roots/pooling, so +# no hyper/rustls plumbing here). Gated behind the `masquerade` feature. +h3 = { version = "0.0.8", optional = true } +http = { version = "1", optional = true } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "http2"], optional = true } + [dev-dependencies] tokio = { version = "1", features = ["full", "test-util", "macros", "rt-multi-thread", "io-util"] } eyre = "0.6" diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index eefe3b5..e938c1b 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -40,6 +40,9 @@ pub struct TuicheInbound { /// Root cancellation token: cancelling it stops the accept loop and tears /// down every live connection (each gets a child token). cancel: CancellationToken, + /// HTTP/3 masquerade config; when `Some`, non-TUIC connections are served + /// as a reverse-proxy HTTP/3 web server. + masquerade: Option, } impl TuicheInbound { @@ -97,7 +100,10 @@ impl AbstractInbound for TuicheInbound { let users = users.clone(); let cb = cb.clone(); let cancel = root_cancel.child_token(); - conn_tasks.spawn(crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel).instrument(span)); + let masquerade = self.masquerade.clone(); + conn_tasks.spawn( + crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade).instrument(span), + ); } conn_tasks.close(); @@ -115,6 +121,7 @@ pub struct TuicheInboundBuilder { private_key_path: Option, opts: ConnectionOpts, cancel: Option, + masquerade: Option, } impl TuicheInboundBuilder { @@ -127,9 +134,17 @@ impl TuicheInboundBuilder { private_key_path: None, opts: ConnectionOpts::default(), cancel: None, + masquerade: None, } } + /// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied + /// to the configured upstream site instead of being dropped. + pub fn masquerade(mut self, masquerade: Option) -> Self { + self.masquerade = masquerade; + self + } + /// Set the cancellation token driving graceful shutdown. Cancelling it /// stops the accept loop and closes every live connection. Defaults to a /// fresh token (i.e. the server only stops when the acceptor closes). @@ -198,6 +213,7 @@ impl TuicheInboundBuilder { private_key_path, cert_store, cancel: self.cancel.unwrap_or_default(), + masquerade: self.masquerade, }) } } diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index bdd7a4e..6b217fb 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -69,6 +69,11 @@ pub struct TuicInboundOpts { /// connections (e.g. one TCP-over-QUIC stream per browser request) ramp out /// of slow-start faster instead of trickling the first few round trips. pub initial_window: u64, + + /// HTTP/3 masquerade. When `Some`, connections that aren't TUIC (their + /// first stream byte isn't `0x05`) are served as a reverse-proxy HTTP/3 + /// web server instead of being dropped. + pub masquerade: Option, } impl Default for TuicInboundOpts { @@ -92,6 +97,7 @@ impl Default for TuicInboundOpts { gso: true, congestion_control: CongestionControl::Bbr, initial_window: 1024 * 1024, + masquerade: None, } } } @@ -217,6 +223,7 @@ impl AbstractInbound for TuicInbound { let users = users.clone(); let auth_timeout = opts.auth_timeout; let zero_rtt = opts.zero_rtt; + let masquerade = opts.masquerade.clone(); let cb = cb.clone(); let conn_cancel = self.cancel.child_token(); let remote = incoming.remote_address(); @@ -227,7 +234,7 @@ impl AbstractInbound for TuicInbound { // `tasks.close()` + `tasks.wait()` after cancelling). self.ctx.tasks.spawn(spawn_logged( "Connection handler", - handle_connection(incoming, users, auth_timeout, zero_rtt, cb, conn_cancel), + handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel), ).instrument(span)); } else => { @@ -255,6 +262,7 @@ async fn handle_connection( users: Arc>, auth_timeout: Duration, zero_rtt: bool, + masquerade: Option, callback: C, cancel: CancellationToken, ) -> eyre::Result<()> { @@ -295,7 +303,16 @@ async fn handle_connection( }; // Hand the established connection to the shared, backend-agnostic core. - crate::server::serve_connection(QuinnConnection::new(conn), remote_addr, users, auth_timeout, callback, cancel).await; + crate::server::serve_connection( + QuinnConnection::new(conn), + remote_addr, + users, + auth_timeout, + callback, + cancel, + masquerade, + ) + .await; Ok(()) } diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs new file mode 100644 index 0000000..faa1915 --- /dev/null +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -0,0 +1,246 @@ +//! HTTP/3 masquerade: serve non-TUIC (real HTTP/3) clients as a reverse proxy. +//! +//! When the connection classifier in [`super`] decides a peer is speaking +//! actual HTTP/3 rather than TUIC (its first stream's leading bytes aren't +//! valid TUIC framing), it hands the connection here. We run a real HTTP/3 +//! server over the +//! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request +//! to a configured upstream site with a `reqwest` client, relaying the response +//! back. To an active prober the server is indistinguishable from a normal +//! HTTP/3 web server. + +use std::{ + sync::{Arc, OnceLock}, + time::Duration, +}; + +use bytes::{Buf as _, Bytes}; +use http::header::HeaderName; +use reqwest::{Client, Url}; +use tokio::sync::{Notify, mpsc}; +use tokio_stream::StreamExt as _; +use tokio_util::sync::CancellationToken; +use tracing::debug; +use wind_quic::{ + PrefixedRecv, QuicConnection, + h3_adapter::{self, H3Conn}, +}; + +use super::MasqueradeConfig; + +/// Cap on a single proxied request body (bounds per-request memory). +const MAX_REQUEST_BODY_SIZE: usize = 16 * 1024 * 1024; +/// Cap on a single proxied response body. +const MAX_RESPONSE_BODY_SIZE: usize = 64 * 1024 * 1024; +/// Upstream request timeout. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// The shared reverse-proxy client. Upstream-independent (it dials per request +/// URL), built once — probers are rare. `reqwest` owns TLS, roots, pooling and +/// ALPN, so there is no rustls/provider plumbing here. +fn client() -> Client { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT + .get_or_init(|| { + Client::builder() + .timeout(REQUEST_TIMEOUT) + .build() + .expect("building the masquerade reqwest client") + }) + .clone() +} + +/// Run the HTTP/3 masquerade server over `conn`. The per-stream router in +/// [`super`] feeds h3 streams to `recv_rx` / `bidi_rx` and fires `go` on the +/// first one. We wait for `go` before building the h3 server (whose setup opens +/// our control stream + SETTINGS), so a pure-TUIC connection — which never +/// fires `go` — never has h3 streams opened on it. Returns when the peer +/// disconnects or `cancel` fires. +pub async fn run_masquerade( + conn: C, + recv_rx: mpsc::UnboundedReceiver>, + bidi_rx: mpsc::UnboundedReceiver<(C::SendStream, PrefixedRecv)>, + go: Arc, + cfg: MasqueradeConfig, + cancel: CancellationToken, +) -> eyre::Result<()> { + tokio::select! { + _ = cancel.cancelled() => return Ok(()), + _ = go.notified() => {} + } + + let backend = Url::parse(&cfg.upstream).map_err(|e| eyre::eyre!("invalid masquerade upstream {:?}: {e}", cfg.upstream))?; + let client = client(); + + let adapter = h3_adapter::server_connection(conn, recv_rx, bidi_rx); + let mut h3conn = h3::server::Connection::new(adapter) + .await + .map_err(|e| eyre::eyre!("h3 server setup failed: {e}"))?; + + loop { + let resolver = tokio::select! { + _ = cancel.cancelled() => break, + res = h3conn.accept() => match res { + Ok(Some(r)) => r, + Ok(None) => break, + Err(e) => { + debug!("masquerade accept ended: {e}"); + break; + } + }, + }; + + let backend = backend.clone(); + let client = client.clone(); + tokio::spawn(async move { + if let Err(e) = handle_request::(resolver, &client, &backend).await { + debug!("masquerade request error: {e:?}"); + } + }); + } + + Ok(()) +} + +/// Resolve one HTTP/3 request and reverse-proxy it; on any failure answer `502` +/// like a real web server rather than resetting the stream. +async fn handle_request( + resolver: h3::server::RequestResolver, Bytes>, + client: &Client, + backend: &Url, +) -> eyre::Result<()> { + let (request, mut stream) = resolver.resolve_request().await?; + + if let Err(e) = forward(client, backend, request, &mut stream).await { + debug!("masquerade upstream failed: {e}"); + let resp = http::Response::builder() + .status(http::StatusCode::BAD_GATEWAY) + .body(()) + .expect("502 response is valid"); + let _ = stream.send_response(resp).await; + let _ = stream.finish().await; + } + Ok(()) +} + +/// Translate the HTTP/3 request into a `reqwest` call to `backend`, stream the +/// response body back (size-capped), and finish the stream. +async fn forward( + client: &Client, + backend: &Url, + request: http::Request<()>, + stream: &mut h3::server::RequestStream, +) -> eyre::Result<()> +where + S: h3::quic::BidiStream, +{ + let target = rewrite_target(backend, request.uri())?; + let mut req = client.request(request.method().clone(), target); + for (name, value) in request.headers() { + if is_forwardable(name) { + req = req.header(name, value); + } + } + + let body = read_request_body(stream).await?; + if !body.is_empty() { + req = req.body(body); + } + + let resp = req.send().await?; + + // Reject an over-cap response *before* committing the response head, so the + // `502` fallback in `handle_request` can still fire cleanly. (A streamed + // response that under-reports or omits `Content-Length` is still bounded + // mid-relay below.) + if let Some(len) = resp.content_length() + && len > MAX_RESPONSE_BODY_SIZE as u64 + { + return Err(eyre::eyre!( + "upstream Content-Length {len} exceeds {MAX_RESPONSE_BODY_SIZE} bytes" + )); + } + + // From here the response head is committed to the h3 stream. A later failure + // (over-cap body, send error) must NOT become a second `502` response on a + // stream that already sent `200` — that yields a truncated-but-"successful" + // reply. Reset the stream instead, so the prober sees an aborted response. + if let Err(e) = relay_response(stream, resp).await { + debug!("masquerade response failed after the head was sent; resetting h3 stream: {e}"); + stream.stop_stream(h3::error::Code::H3_INTERNAL_ERROR); + } + Ok(()) +} + +/// Send the upstream response head, stream its body back (size-capped), and +/// finish the stream. Any error leaves the already-committed response +/// incomplete; the caller resets the stream rather than sending a fresh status. +async fn relay_response(stream: &mut h3::server::RequestStream, resp: reqwest::Response) -> eyre::Result<()> +where + S: h3::quic::BidiStream, +{ + let mut builder = http::Response::builder().status(resp.status()); + for (name, value) in resp.headers() { + if is_forwardable(name) { + builder = builder.header(name, value); + } + } + stream.send_response(builder.body(())?).await?; + + let mut sent = 0usize; + let mut body_stream = resp.bytes_stream(); + while let Some(chunk) = body_stream.next().await { + let chunk = chunk?; + sent += chunk.len(); + if sent > MAX_RESPONSE_BODY_SIZE { + return Err(eyre::eyre!("upstream response body exceeds {MAX_RESPONSE_BODY_SIZE} bytes")); + } + if !chunk.is_empty() { + stream.send_data(chunk).await?; + } + } + stream.finish().await?; + Ok(()) +} + +/// Drain the HTTP/3 request body (usually empty for a probe's GET), capped. +async fn read_request_body(stream: &mut h3::server::RequestStream) -> eyre::Result +where + S: h3::quic::BidiStream, +{ + let mut body = Vec::new(); + while let Some(mut chunk) = stream.recv_data().await? { + let n = chunk.remaining(); + body.extend_from_slice(chunk.copy_to_bytes(n).as_ref()); + if body.len() > MAX_REQUEST_BODY_SIZE { + return Err(eyre::eyre!("request body exceeds {MAX_REQUEST_BODY_SIZE} bytes")); + } + } + let _ = stream.recv_trailers().await?; + Ok(Bytes::from(body)) +} + +/// Point the request at the backend: keep the backend's scheme/host/port, +/// append the incoming path and query. +fn rewrite_target(backend: &Url, uri: &http::Uri) -> eyre::Result { + let path_and_query = uri.path_and_query().map(|v| v.as_str()).unwrap_or("/"); + let mut target = backend.clone(); + target.set_path(""); + target.set_query(None); + Ok(target.join(path_and_query)?) +} + +/// Whether a header may cross the proxy: drops hop-by-hop headers (RFC 9110 +/// §7.6.1) plus framing headers each side manages itself. +fn is_forwardable(name: &HeaderName) -> bool { + !matches!( + name.as_str().to_ascii_lowercase().as_str(), + "connection" + | "keep-alive" + | "proxy-connection" + | "transfer-encoding" + | "upgrade" + | "te" | "trailer" + | "host" | "content-length" + ) +} diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index f3f5535..1428951 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -11,7 +11,15 @@ //! (the reference behavior) and the replacement for the bespoke `wind-tuiche` //! driver. -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, +}; use arc_swap::ArcSwapOption; use eyre::Context as _; @@ -31,6 +39,22 @@ use wind_quic::{QuicConnection, QuicError}; use crate::proto::{CmdType, Command, UdpStream}; +#[cfg(feature = "masquerade")] +mod masquerade; + +/// Configuration for the HTTP/3 masquerade. +/// +/// Kept dependency-free (just the upstream URL) so it threads through the +/// always-compiled [`serve_connection`] even when the `masquerade` feature is +/// off. The actual reverse-proxy engine lives in the feature-gated +/// [`masquerade`] module. +#[derive(Clone, Debug)] +pub struct MasqueradeConfig { + /// Upstream site to reverse-proxy non-TUIC HTTP/3 requests to, + /// e.g. `https://example.com`. + pub upstream: String, +} + async fn spawn_logged(label: &str, fut: impl std::future::Future>) { if let Err(err) = fut.await { error!("{label} error: {err:?}"); @@ -131,6 +155,102 @@ impl Clone for UdpSession { /// would let one authenticated peer pin a large amount of background work. const MAX_UDP_SESSIONS_PER_CONN: u64 = 1024; +/// Per-connection senders for the lazily-started HTTP/3 masquerade. The +/// per-stream router pushes streams it classified as h3 here; `run_masquerade` +/// (spawned parked) pulls them after `go` fires on the first one. `None` +/// everywhere when the masquerade is disabled or not compiled in. +#[cfg_attr(not(feature = "masquerade"), allow(dead_code))] +struct H3Senders { + uni_tx: mpsc::UnboundedSender>, + bidi_tx: mpsc::UnboundedSender<(C::SendStream, wind_quic::PrefixedRecv)>, + go: Arc, +} + +/// A non-TUIC stream handed to the h3 masquerade by the per-stream router. +enum H3Stream { + Uni(wind_quic::PrefixedRecv), + Bi(C::SendStream, wind_quic::PrefixedRecv), +} + +/// Whether a 2-byte prefix is TUIC framing: `[VER, CmdType]` with `CmdType` in +/// `Auth..=Heartbeat` (0..=4). An HTTP/3 stream-type / frame-type byte won't +/// satisfy both, so this distinguishes the two even when an h3 stream happens +/// to start with `VER`. +fn is_tuic_prefix(prefix: [u8; 2]) -> bool { + prefix[0] == crate::proto::VER && prefix[1] <= u8::from(CmdType::Heartbeat) +} + +/// Read the 2-byte classifier prefix from a stream (`None` if it closes first). +async fn read_prefix(recv: &mut R) -> Option<[u8; 2]> { + let mut prefix = [0u8; 2]; + recv.read_exact(&mut prefix).await.ok().map(|_| prefix) +} + +/// Build this connection's HTTP/3 masquerade router: two channels feeding a +/// **parked** `run_masquerade` task (it only builds the h3 server once the +/// router wakes it on the first h3 stream). Returns `None` (and spawns nothing) +/// when the masquerade is disabled. +#[cfg(feature = "masquerade")] +fn spawn_h3_router( + conn: C, + masq: Option, + cancel: CancellationToken, +) -> Option>> { + let cfg = masq?; + let (uni_tx, uni_rx) = mpsc::unbounded_channel(); + let (bidi_tx, bidi_rx) = mpsc::unbounded_channel(); + let go = Arc::new(Notify::new()); + // Run the masquerade parked. If it fails (invalid upstream URL, h3 setup + // error) the connection would otherwise leak: a non-TUIC stream has already + // flipped `h3_active`, so the auth-timeout guard won't reap it. Log the error + // and close the connection ourselves, mirroring that guard's cleanup. + let close_conn = conn.clone(); + let go_task = go.clone(); + tokio::spawn(async move { + if let Err(e) = masquerade::run_masquerade(conn, uni_rx, bidi_rx, go_task, cfg, cancel).await { + warn!("HTTP/3 masquerade task failed; closing connection: {e:?}"); + close_conn.close(0, b""); + } + }); + Some(Arc::new(H3Senders { uni_tx, bidi_tx, go })) +} + +/// No router when the masquerade isn't compiled in. +#[cfg(not(feature = "masquerade"))] +fn spawn_h3_router( + _conn: C, + _masq: Option, + _cancel: CancellationToken, +) -> Option>> { + None +} + +/// Route a stream the classifier decided is **not** TUIC: hand it to the h3 +/// masquerade (waking the parked server), or close the connection if the +/// masquerade is off. +fn route_non_tuic( + ctx: &InboundCtx, + h3: Option<&Arc>>, + active: &AtomicBool, + stream: H3Stream, +) { + if let Some(s) = h3 { + active.store(true, Ordering::Relaxed); + match stream { + H3Stream::Uni(recv) => { + let _ = s.uni_tx.send(recv); + } + H3Stream::Bi(send, recv) => { + let _ = s.bidi_tx.send((send, recv)); + } + } + s.go.notify_one(); + } else { + drop(stream); + ctx.conn.close(0, b""); + } +} + /// Drive an established TUIC connection: spawn the auth-timeout guard and the /// datagram/uni/bi accept loops, then run until the peer disconnects or /// `cancel` fires. Backend-agnostic — both backends call this after their @@ -142,6 +262,7 @@ pub async fn serve_connection( auth_timeout: Duration, callback: CB, cancel: CancellationToken, + masq: Option, ) where C: QuicConnection, CB: InboundCallback, @@ -171,15 +292,24 @@ pub async fn serve_connection( udp_root_cancel, }); - // Authentication timeout: close the connection if no UUID is set in time. + // Per-connection HTTP/3 masquerade router: a parked `run_masquerade` task plus + // two channels the acceptor loops feed. `None` when masquerade is disabled. + // `h3_active` flips true once any stream classifies as h3 so the auth-timeout + // guard knows not to close what is actually an HTTP/3 connection. + let h3 = spawn_h3_router(connection.conn.clone(), masq, cancel.clone()); + let h3_active = Arc::new(AtomicBool::new(false)); + + // Authentication timeout: close the connection if it never authenticated AND + // never turned out to be an HTTP/3 (masquerade) connection. { let conn_auth = connection.clone(); let auth_cancel = cancel.clone(); + let active = h3_active.clone(); tokio::spawn( async move { tokio::select! { _ = tokio::time::sleep(auth_timeout) => { - if conn_auth.uuid.load().is_none() { + if conn_auth.uuid.load().is_none() && !active.load(Ordering::Relaxed) { warn!("Connection from {} authentication timeout", remote_addr); conn_auth.conn.close(0, b"auth timeout"); } @@ -197,10 +327,11 @@ pub async fn serve_connection( // cache) is dropped instead of leaking until server shutdown. let acceptor_cancel = cancel.child_token(); - // Datagram acceptor. Pre-auth datagrams are handled inline (serially) so an - // unauthenticated peer can't spawn unbounded tasks parked on `auth_notify`; - // once authed, each datagram is dispatched in parallel so a slow outbound - // queue can't block the read loop. + // Datagram acceptor. Classify each datagram by its first two bytes: TUIC + // datagrams (heartbeat / native-mode UDP) are handled inline pre-auth (so an + // unauthenticated peer can't spawn unbounded tasks parked on `auth_notify`) + // and spawned post-auth; non-TUIC datagrams are dropped — the masquerade + // serves no QUIC datagrams. { let conn = connection.clone(); let cb = callback.clone(); @@ -215,6 +346,9 @@ pub async fn serve_connection( let conn = conn.clone(); let cb = cb.clone(); async move { + if datagram.len() < 2 || !is_tuic_prefix([datagram[0], datagram[1]]) { + return; + } if conn.uuid.load().is_some() { tokio::spawn( spawn_logged("Datagram", handle_datagram(conn, datagram, cb)) @@ -232,11 +366,16 @@ pub async fn serve_connection( ); } - // Uni stream acceptor. + // Uni stream acceptor. Every accepted stream reads its 2-byte prefix and + // routes itself: TUIC (`is_tuic_prefix`) → `handle_uni_stream`, otherwise → + // the h3 masquerade (or close). The peeked bytes are replayed via + // `PrefixedRecv` so the chosen handler reads from byte 0. { let conn = connection.clone(); let cb = callback.clone(); let uni_cancel = acceptor_cancel.clone(); + let h3 = h3.clone(); + let active = h3_active.clone(); tokio::spawn( async move { acceptor_loop( @@ -246,10 +385,23 @@ pub async fn serve_connection( |recv| { let conn = conn.clone(); let cb = cb.clone(); + let h3 = h3.clone(); + let active = active.clone(); async move { tokio::spawn( - spawn_logged("Uni stream", handle_uni_stream(conn, recv, cb)) - .instrument(tracing::debug_span!("uni_stream")), + async move { + let mut recv = recv; + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + if let Err(e) = handle_uni_stream(conn, recv, cb).await { + error!("Uni stream error: {e:?}"); + } + } else { + route_non_tuic(&conn, h3.as_ref(), &active, H3Stream::Uni(recv)); + } + } + .instrument(tracing::debug_span!("uni_stream")), ); } }, @@ -260,11 +412,13 @@ pub async fn serve_connection( ); } - // Bi stream acceptor. + // Bi stream acceptor — same per-stream classify + route. { let conn = connection.clone(); let cb = callback.clone(); let bi_cancel = acceptor_cancel.clone(); + let h3 = h3.clone(); + let active = h3_active.clone(); tokio::spawn( async move { acceptor_loop( @@ -274,10 +428,23 @@ pub async fn serve_connection( |(send, recv)| { let conn = conn.clone(); let cb = cb.clone(); + let h3 = h3.clone(); + let active = active.clone(); async move { tokio::spawn( - spawn_logged("Bi stream", handle_bi_stream(conn, send, recv, cb)) - .instrument(tracing::debug_span!("bi_stream")), + async move { + let mut recv = recv; + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + if let Err(e) = handle_bi_stream(conn, send, recv, cb).await { + error!("Bi stream error: {e:?}"); + } + } else { + route_non_tuic(&conn, h3.as_ref(), &active, H3Stream::Bi(send, recv)); + } + } + .instrument(tracing::debug_span!("bi_stream")), ); } }, @@ -303,7 +470,7 @@ pub async fn serve_connection( async fn handle_uni_stream( ctx: Arc>, - mut recv: C::RecvStream, + mut recv: impl AsyncRead + Unpin + Send + 'static, callback: CB, ) -> eyre::Result<()> { let mut header_buf = [0u8; 2]; @@ -397,7 +564,7 @@ async fn handle_uni_stream( async fn handle_bi_stream( connection: Arc>, send: C::SendStream, - mut recv: C::RecvStream, + mut recv: impl AsyncRead + Unpin + Send + 'static, callback: CB, ) -> eyre::Result<()> { if !ensure_authed(&connection).await {