From 567aa9228aecff24195c9eca289aff19e2336a3e Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 04:24:59 +0800 Subject: [PATCH 01/10] feat(wind-tuic): HTTP/3 masquerade for non-TUIC clients MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TUIC advertises the `h3` ALPN to disguise its traffic, so real TUIC clients and active HTTP/3 probers negotiate the identical ALPN. Today a prober speaking real HTTP/3 (e.g. `curl --http3`) gets its QUIC handshake accepted but then every stream is reset — an "accept then reset" fingerprint that distinguishes the server from a real web server. This adds an HTTP/3 masquerade: when a connecting client isn't TUIC, the server poses as a genuine HTTP/3 web server by reverse-proxying the request to a configured upstream site and relaying the response. Approach (one engine, both backends): - A backend-agnostic `h3::quic` adapter over `wind_quic::QuicConnection` (new `wind-quic/src/h3_adapter.rs`, feature `h3`), so the hyperium `h3` crate drives the masquerade over either the quinn or quiche backend. - Detection lives once in `serve_connection`: peek the first uni stream's first byte — `0x05` (TUIC VER) routes to TUIC, anything else to the masquerade. The peeked byte is replayed via `PrefixedRecv` so both the TUIC parser and the h3 adapter read from byte 0. This is the key reason the adapter owns stream acceptance rather than using `quiche::h3` (which can't be handed back a consumed byte). - The masquerade runner (`wind-tuic/src/server/masquerade.rs`) runs an `h3::server` over the adapter and reverse-proxies each request to the upstream via a pooled hyper + rustls (platform-verifier) client; on upstream failure it returns 502 rather than resetting. Config: new `[masquerade] enabled/upstream` in tuic-server, threaded into both the quinn and quiche inbounds. Gated by the wind-tuic `masquerade` cargo feature (on by default); disabled at runtime by default. Verified: builds for quinn+masquerade, `--features quiche`, masquerade off, and the full workspace; clippy clean; existing tuic-tests (quinn relay, quiche relay, 0-RTT, cert-reload) still pass — the `0x05` discriminator routes real TUIC unchanged on both backends. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 21 ++ crates/tuic-server/src/config.rs | 22 ++ crates/tuic-server/src/wind_adapter.rs | 6 + crates/wind-quic/Cargo.toml | 8 + crates/wind-quic/src/h3_adapter.rs | 441 ++++++++++++++++++++++ crates/wind-quic/src/lib.rs | 6 + crates/wind-quic/src/prefixed.rs | 67 ++++ crates/wind-quic/src/quiche/stream.rs | 8 + crates/wind-quic/src/quinn/mod.rs | 8 + crates/wind-quic/src/traits.rs | 9 + crates/wind-tuic/Cargo.toml | 31 +- crates/wind-tuic/src/quiche/inbound.rs | 18 +- crates/wind-tuic/src/quinn/inbound.rs | 21 +- crates/wind-tuic/src/server/masquerade.rs | 246 ++++++++++++ crates/wind-tuic/src/server/mod.rs | 83 +++- 15 files changed, 990 insertions(+), 5 deletions(-) create mode 100644 crates/wind-quic/src/h3_adapter.rs create mode 100644 crates/wind-quic/src/prefixed.rs create mode 100644 crates/wind-tuic/src/server/masquerade.rs diff --git a/Cargo.lock b/Cargo.lock index 6ddbafe..a00b637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10872b55cfb02a821b69dc7cf8dc6a71d6af25eb9a79662bec4a9d016056b3be" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http", + "pin-project-lite", + "tokio", +] + [[package]] name = "half" version = "2.7.1" @@ -5817,6 +5831,7 @@ dependencies = [ "eyre", "foreign-types-shared", "futures-util", + "h3", "pin-project", "quinn 0.12.0", "quinn-congestions", @@ -5889,6 +5904,12 @@ dependencies = [ "enum_dispatch", "eyre", "futures-util", + "h3", + "http", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", "moka", "pin-project", "portable-atomic", diff --git a/crates/tuic-server/src/config.rs b/crates/tuic-server/src/config.rs index ba490e5..5c76fe6 100644 --- a/crates/tuic-server/src/config.rs +++ b/crates/tuic-server/src/config.rs @@ -87,6 +87,11 @@ pub struct Config { pub users: HashMap, pub tls: TlsConfig, + /// HTTP/3 masquerade: reverse-proxy non-TUIC (HTTP/3 probe) connections to a + /// real upstream site so the server is indistinguishable from a web server. + #[serde(default)] + pub masquerade: MasqueradeConfig, + #[educe(Default = "")] pub data_dir: PathBuf, @@ -257,6 +262,23 @@ pub struct TlsConfig { pub acme_staging: bool, } +/// HTTP/3 masquerade configuration. +/// +/// When `enabled`, a connection that isn't TUIC (its first stream byte isn't the +/// TUIC version `0x05` — i.e. an active prober speaking real HTTP/3) is served as +/// a reverse proxy to `upstream`, so the server is indistinguishable from a +/// normal HTTP/3 website instead of resetting the connection. +#[derive(Deserialize, Serialize, Educe)] +#[educe(Default)] +#[serde(default, deny_unknown_fields)] +pub struct MasqueradeConfig { + #[educe(Default(expression = false))] + pub enabled: bool, + /// Upstream site to reverse-proxy to, e.g. `https://example.com`. + #[educe(Default(expression = "https://example.com"))] + pub upstream: String, +} + /// Transport tuning for the quinn backend (`wind-tuic`). #[derive(Deserialize, Serialize, Educe)] #[educe(Default)] diff --git a/crates/tuic-server/src/wind_adapter.rs b/crates/tuic-server/src/wind_adapter.rs index 0f793ba..6421da5 100644 --- a/crates/tuic-server/src/wind_adapter.rs +++ b/crates/tuic-server/src/wind_adapter.rs @@ -284,6 +284,9 @@ async fn create_quinn_inbound(ctx: &Arc) -> eyre::Result) -> eyre::Result = Pin + Send>>; + +/// A boxed in-flight `accept_bi` / `open_bi` future. Aliased because the bidi +/// case returns a `(SendStream, RecvStream)` tuple, which trips clippy's +/// `type_complexity` lint when written inline in every slot/signature. +type BoxBiFut = BoxFut::SendStream, ::RecvStream), QuicError>>; + +/// Build an HTTP/3 server connection over `conn`, yielding `first_control` (the +/// peer's control stream, with its peeked byte already replayed) as the first +/// accepted recv stream. +pub fn server_connection(conn: C, first_control: Box) -> H3Conn { + H3Conn { + conn, + first_recv: Some(H3Recv::new(first_control)), + accept_recv_fut: None, + accept_bi_fut: None, + open_uni_fut: None, + open_bi_fut: None, + } +} + +fn conn_err(e: QuicError) -> ConnectionErrorIncoming { + match e { + QuicError::TimedOut => ConnectionErrorIncoming::Timeout, + QuicError::ApplicationClosed { .. } | QuicError::LocallyClosed => { + ConnectionErrorIncoming::ApplicationClose { error_code: 0 } + } + other => ConnectionErrorIncoming::InternalError(other.to_string()), + } +} + +fn stream_err(e: QuicError) -> StreamErrorIncoming { + StreamErrorIncoming::ConnectionErrorIncoming { + connection_error: conn_err(e), + } +} + +// --------------------------------------------------------------------------- +// Recv stream +// --------------------------------------------------------------------------- + +/// `h3::quic::RecvStream` over a boxed [`QuicRecvStream`]. +pub struct H3Recv { + inner: Box, + id: u64, + scratch: Vec, +} + +impl H3Recv { + fn new(inner: Box) -> Self { + let id = inner.id(); + Self { + inner, + id, + scratch: vec![0u8; RECV_CHUNK], + } + } +} + +impl RecvStream for H3Recv { + type Buf = Bytes; + + fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { + let mut rb = ReadBuf::new(&mut self.scratch); + match Pin::new(&mut self.inner).poll_read(cx, &mut rb) { + Poll::Ready(Ok(())) => { + let filled = rb.filled(); + if filled.is_empty() { + // Clean EOF (peer FIN). + Poll::Ready(Ok(None)) + } else { + Poll::Ready(Ok(Some(Bytes::copy_from_slice(filled)))) + } + } + Poll::Ready(Err(e)) => Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => Poll::Pending, + } + } + + fn stop_sending(&mut self, error_code: u64) { + self.inner.stop(error_code); + } + + fn recv_id(&self) -> StreamId { + stream_id(self.id) + } +} + +// --------------------------------------------------------------------------- +// Send stream +// --------------------------------------------------------------------------- + +/// `h3::quic::SendStream` over a boxed [`QuicSendStream`]. +pub struct H3Send { + inner: Box, + id: u64, + /// At most one `WriteBuf` is buffered at a time; `poll_ready`/`poll_finish` + /// drain it through the underlying `AsyncWrite`. + pending: Option>, +} + +impl H3Send { + fn new(inner: Box) -> Self { + let id = inner.id(); + Self { + inner, + id, + pending: None, + } + } + + /// Flush the buffered `WriteBuf` (if any) to the underlying stream. Returns + /// `Ready(Ok)` once nothing is buffered. + fn poll_flush_pending(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(buf) = self.pending.as_mut() { + while buf.has_remaining() { + let chunk = buf.chunk(); + match Pin::new(&mut self.inner).poll_write(cx, chunk) { + Poll::Ready(Ok(0)) => { + return Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "h3 send stream wrote zero bytes", + ))))); + } + Poll::Ready(Ok(n)) => buf.advance(n), + Poll::Ready(Err(e)) => return Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => return Poll::Pending, + } + } + } + self.pending = None; + Poll::Ready(Ok(())) + } +} + +impl SendStream for H3Send { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_flush_pending(cx) + } + + fn send_data>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> { + // h3 always polls `poll_ready` to readiness before `send_data`, so the + // previous buffer has drained. + self.pending = Some(data.into()); + Ok(()) + } + + fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.poll_flush_pending(cx) { + Poll::Ready(Ok(())) => match Pin::new(&mut self.inner).poll_flush(cx) { + Poll::Ready(Ok(())) => { + let _ = self.inner.finish(); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(StreamErrorIncoming::Unknown(Box::new(e)))), + Poll::Pending => Poll::Pending, + }, + other => other, + } + } + + fn reset(&mut self, reset_code: u64) { + self.inner.reset(reset_code); + } + + fn send_id(&self) -> StreamId { + stream_id(self.id) + } +} + +// --------------------------------------------------------------------------- +// Bidi stream (request streams) +// --------------------------------------------------------------------------- + +/// `h3::quic::BidiStream` joining an [`H3Send`] and an [`H3Recv`]. +pub struct H3Bidi { + send: H3Send, + recv: H3Recv, +} + +impl SendStream for H3Bidi { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.send.poll_ready(cx) + } + + fn send_data>>(&mut self, data: T) -> Result<(), StreamErrorIncoming> { + self.send.send_data(data) + } + + fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { + self.send.poll_finish(cx) + } + + fn reset(&mut self, reset_code: u64) { + self.send.reset(reset_code); + } + + fn send_id(&self) -> StreamId { + self.send.send_id() + } +} + +impl RecvStream for H3Bidi { + type Buf = Bytes; + + fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { + self.recv.poll_data(cx) + } + + fn stop_sending(&mut self, error_code: u64) { + self.recv.stop_sending(error_code); + } + + fn recv_id(&self) -> StreamId { + self.recv.recv_id() + } +} + +impl BidiStream for H3Bidi { + type SendStream = H3Send; + type RecvStream = H3Recv; + + fn split(self) -> (Self::SendStream, Self::RecvStream) { + (self.send, self.recv) + } +} + +// --------------------------------------------------------------------------- +// Opener +// --------------------------------------------------------------------------- + +/// Opens local uni/bidi streams (HTTP/3 control + QPACK streams). Produced by +/// [`Connection::opener`]. +pub struct H3Opener { + conn: C, + open_uni_fut: Option>>, + open_bi_fut: Option>, +} + +impl OpenStreams for H3Opener { + type BidiStream = H3Bidi; + type SendStream = H3Send; + + fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) + } + + fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_send(&self.conn, &mut self.open_uni_fut, cx) + } + + fn close(&mut self, _code: h3::error::Code, reason: &[u8]) { + self.conn.close(0, reason); + } +} + +// --------------------------------------------------------------------------- +// Connection +// --------------------------------------------------------------------------- + +/// `h3::quic::Connection` over a [`QuicConnection`] handle. +pub struct H3Conn { + conn: C, + first_recv: Option, + accept_recv_fut: Option>>, + accept_bi_fut: Option>, + open_uni_fut: Option>>, + open_bi_fut: Option>, +} + +impl OpenStreams for H3Conn { + type BidiStream = H3Bidi; + type SendStream = H3Send; + + fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) + } + + fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { + poll_open_send(&self.conn, &mut self.open_uni_fut, cx) + } + + fn close(&mut self, _code: h3::error::Code, reason: &[u8]) { + self.conn.close(0, reason); + } +} + +impl Connection for H3Conn { + type RecvStream = H3Recv; + type OpenStreams = H3Opener; + + fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(recv) = self.first_recv.take() { + return Poll::Ready(Ok(recv)); + } + let poll = { + let conn = &self.conn; + let fut = self + .accept_recv_fut + .get_or_insert_with(|| boxed_accept_uni(conn.clone())); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + self.accept_recv_fut = None; + Poll::Ready(res.map(|r| H3Recv::new(Box::new(r))).map_err(conn_err)) + } + Poll::Pending => Poll::Pending, + } + } + + fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + let poll = { + let conn = &self.conn; + let fut = self.accept_bi_fut.get_or_insert_with(|| boxed_accept_bi(conn.clone())); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + self.accept_bi_fut = None; + Poll::Ready(res.map(into_bidi).map_err(conn_err)) + } + Poll::Pending => Poll::Pending, + } + } + + fn opener(&self) -> Self::OpenStreams { + H3Opener { + conn: self.conn.clone(), + open_uni_fut: None, + open_bi_fut: None, + } + } +} + +// --------------------------------------------------------------------------- +// Shared poll helpers +// --------------------------------------------------------------------------- + +fn stream_id(id: u64) -> StreamId { + // QUIC stream ids fit the h3 `StreamId` invariant (< 2^62); fall back to 0 + // only if a backend ever surfaces something out of range. + StreamId::try_from(id).unwrap_or_else(|_| StreamId::try_from(0).expect("0 is a valid stream id")) +} + +fn boxed_accept_uni(conn: C) -> BoxFut> { + Box::pin(async move { conn.accept_uni().await }) +} + +fn boxed_accept_bi(conn: C) -> BoxBiFut { + Box::pin(async move { conn.accept_bi().await }) +} + +fn into_bidi((send, recv): (S, R)) -> H3Bidi { + H3Bidi { + send: H3Send::new(Box::new(send)), + recv: H3Recv::new(Box::new(recv)), + } +} + +fn poll_open_send( + conn: &C, + slot: &mut Option>>, + cx: &mut Context<'_>, +) -> Poll> { + let poll = { + let fut = slot.get_or_insert_with(|| { + let conn = conn.clone(); + Box::pin(async move { conn.open_uni().await }) + }); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + *slot = None; + Poll::Ready(res.map(|s| H3Send::new(Box::new(s))).map_err(stream_err)) + } + Poll::Pending => Poll::Pending, + } +} + +fn poll_open_bidi( + conn: &C, + slot: &mut Option>, + cx: &mut Context<'_>, +) -> Poll> { + let poll = { + let fut = slot.get_or_insert_with(|| { + let conn = conn.clone(); + Box::pin(async move { conn.open_bi().await }) + }); + fut.as_mut().poll(cx) + }; + match poll { + Poll::Ready(res) => { + *slot = None; + Poll::Ready(res.map(into_bidi).map_err(stream_err)) + } + Poll::Pending => Poll::Pending, + } +} diff --git a/crates/wind-quic/src/lib.rs b/crates/wind-quic/src/lib.rs index a688aa2..e701232 100644 --- a/crates/wind-quic/src/lib.rs +++ b/crates/wind-quic/src/lib.rs @@ -33,11 +33,17 @@ pub mod config; pub mod error; +pub mod prefixed; pub mod traits; pub use config::{CertSource, ClientTlsConfig, ServerTlsConfig, TransportConfig}; pub use error::{QuicError, Result}; +pub use prefixed::PrefixedRecv; pub use traits::{QuicConnection, QuicRecvStream, QuicSendStream}; + +/// HTTP/3 `h3::quic` adapter over [`QuicConnection`] (masquerade support). +#[cfg(feature = "h3")] +pub mod h3_adapter; // Re-export the shared congestion-control selector so consumers configure the // transport without depending on `wind-core` directly. pub use wind_core::quic::QuicCongestionControl; diff --git a/crates/wind-quic/src/prefixed.rs b/crates/wind-quic/src/prefixed.rs new file mode 100644 index 0000000..20d4b0f --- /dev/null +++ b/crates/wind-quic/src/prefixed.rs @@ -0,0 +1,67 @@ +//! A recv stream that replays a small buffered prefix before its inner stream. +//! +//! The TUIC server multiplexes the real TUIC protocol and an HTTP/3 masquerade +//! on the same QUIC port (both negotiate the `h3` ALPN). To classify a +//! connection it peeks the first byte(s) of the first stream; [`PrefixedRecv`] +//! then lets that consumed prefix be re-read transparently, whether the stream +//! is handed to the TUIC header parser or to the HTTP/3 adapter. + +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use tokio::io::{AsyncRead, ReadBuf}; + +use crate::traits::QuicRecvStream; + +/// Wraps a recv stream so a buffered `prefix` is yielded before the inner +/// stream's own data. Once the prefix is drained, reads delegate straight to the +/// inner stream. +pub struct PrefixedRecv { + prefix: Bytes, + inner: R, +} + +impl PrefixedRecv { + /// Build a `PrefixedRecv` that replays `prefix`, then `inner`. + pub fn new(prefix: impl Into, inner: R) -> Self { + Self { + prefix: prefix.into(), + inner, + } + } + + /// Consume the wrapper, returning the inner stream. The (possibly partial) + /// unread prefix is returned alongside so callers don't silently drop it. + pub fn into_parts(self) -> (Bytes, R) { + (self.prefix, self.inner) + } +} + +impl AsyncRead for PrefixedRecv { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let this = self.get_mut(); + if !this.prefix.is_empty() { + let n = this.prefix.len().min(buf.remaining()); + if n > 0 { + let chunk = this.prefix.split_to(n); + buf.put_slice(&chunk); + } + return Poll::Ready(Ok(())); + } + Pin::new(&mut this.inner).poll_read(cx, buf) + } +} + +impl QuicRecvStream for PrefixedRecv { + fn stop(&mut self, code: u64) { + self.inner.stop(code); + } + + fn id(&self) -> u64 { + self.inner.id() + } +} diff --git a/crates/wind-quic/src/quiche/stream.rs b/crates/wind-quic/src/quiche/stream.rs index 1568efb..b57a3e9 100644 --- a/crates/wind-quic/src/quiche/stream.rs +++ b/crates/wind-quic/src/quiche/stream.rs @@ -100,6 +100,10 @@ impl QuicSendStream for QuicheSend { self.tx.close(); self.finished = true; } + + fn id(&self) -> u64 { + self.sid + } } /// Recv half of a quiche stream. @@ -148,4 +152,8 @@ impl QuicRecvStream for QuicheRecv { code, }); } + + fn id(&self) -> u64 { + self.sid + } } diff --git a/crates/wind-quic/src/quinn/mod.rs b/crates/wind-quic/src/quinn/mod.rs index ad2fc6a..05eb00d 100644 --- a/crates/wind-quic/src/quinn/mod.rs +++ b/crates/wind-quic/src/quinn/mod.rs @@ -62,6 +62,10 @@ impl QuicSendStream for QuinnSend { fn reset(&mut self, code: u64) { let _ = self.0.reset(VarInt::from_u64(code).unwrap_or(VarInt::MAX)); } + + fn id(&self) -> u64 { + self.0.id().into() + } } impl AsyncRead for QuinnRecv { @@ -74,6 +78,10 @@ impl QuicRecvStream for QuinnRecv { fn stop(&mut self, code: u64) { let _ = self.0.stop(VarInt::from_u64(code).unwrap_or(VarInt::MAX)); } + + fn id(&self) -> u64 { + self.0.id().into() + } } // --------------------------------------------------------------------------- diff --git a/crates/wind-quic/src/traits.rs b/crates/wind-quic/src/traits.rs index 78dc8de..a0c503a 100644 --- a/crates/wind-quic/src/traits.rs +++ b/crates/wind-quic/src/traits.rs @@ -24,12 +24,21 @@ pub trait QuicSendStream: AsyncWrite + Unpin + Send + 'static { /// Abruptly reset the stream with `code`, discarding unsent data. fn reset(&mut self, code: u64); + + /// The QUIC stream id of this send half. + /// + /// Needed by protocol layers (e.g. the HTTP/3 masquerade's `h3::quic` + /// adapter) that must report stream ids to the peer. + fn id(&self) -> u64; } /// The receive half of a QUIC stream. pub trait QuicRecvStream: AsyncRead + Unpin + Send + 'static { /// Ask the peer to stop sending, with error `code`. fn stop(&mut self, code: u64); + + /// The QUIC stream id of this recv half. + fn id(&self) -> u64; } /// A cheaply-cloneable handle to an established QUIC connection. diff --git a/crates/wind-tuic/Cargo.toml b/crates/wind-tuic/Cargo.toml index a5bf2c9..8bef7ff 100644 --- a/crates/wind-tuic/Cargo.toml +++ b/crates/wind-tuic/Cargo.toml @@ -7,7 +7,7 @@ description.workspace = true license = "MIT OR Apache-2.0" [features] -default = ["server", "client", "quinn", "aws-lc-rs"] +default = ["server", "client", "quinn", "aws-lc-rs", "masquerade"] decode = ["tuic-core/decode"] encode = ["tuic-core/encode"] # The server decodes client requests AND encodes UDP response datagrams, so it @@ -15,6 +15,25 @@ encode = ["tuic-core/encode"] server = ["decode", "encode"] client = ["encode"] +# HTTP/3 masquerade: when a connecting client speaks real HTTP/3 instead of TUIC, +# pose as a genuine HTTP/3 web server by reverse-proxying to a configured upstream +# site. Backend-agnostic via the `wind-quic` h3 adapter, so it covers both the +# quinn and quiche backends. The runtime `[masquerade].enabled` flag gates it; the +# feature only compiles the code + deps in. +masquerade = [ + "server", + "wind-quic/h3", + "dep:h3", + "dep:hyper", + "dep:hyper-util", + "dep:hyper-rustls", + "dep:http", + "dep:http-body-util", + "dep:rustls", + "rustls/tls12", + "dep:rustls-platform-verifier", +] + # Quinn Backend quinn = [ "wind-quic/quinn", @@ -78,6 +97,16 @@ rustls = { version = "0.23", default-features = false, optional = true } rustls-platform-verifier = { version = "0.7", default-features = false, optional = true } aws-lc-rs = { version = "*", optional = true, default-features = false } +# HTTP/3 masquerade: the transport-agnostic h3 server engine plus a hyper-based +# reverse-proxy client to the upstream site. All gated behind the `masquerade` +# feature. +h3 = { version = "0.0.8", optional = true } +hyper = { version = "1", default-features = false, features = ["client", "http1"], optional = true } +hyper-util = { version = "0.1", default-features = false, features = ["client-legacy", "http1", "tokio"], optional = true } +hyper-rustls = { version = "0.27", default-features = false, features = ["http1"], optional = true } +http = { version = "1", optional = true } +http-body-util = { version = "0.1", optional = true } + [dev-dependencies] tokio = { version = "1", features = ["full", "test-util", "macros", "rt-multi-thread", "io-util"] } eyre = "0.6" diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index eefe3b5..1615594 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -40,6 +40,9 @@ pub struct TuicheInbound { /// Root cancellation token: cancelling it stops the accept loop and tears /// down every live connection (each gets a child token). cancel: CancellationToken, + /// HTTP/3 masquerade config; when `Some`, non-TUIC connections are served as + /// a reverse-proxy HTTP/3 web server. + masquerade: Option, } impl TuicheInbound { @@ -97,7 +100,10 @@ impl AbstractInbound for TuicheInbound { let users = users.clone(); let cb = cb.clone(); let cancel = root_cancel.child_token(); - conn_tasks.spawn(crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel).instrument(span)); + let masquerade = self.masquerade.clone(); + conn_tasks.spawn( + crate::server::serve_connection(conn, remote, users, AUTH_TIMEOUT, cb, cancel, masquerade).instrument(span), + ); } conn_tasks.close(); @@ -115,6 +121,7 @@ pub struct TuicheInboundBuilder { private_key_path: Option, opts: ConnectionOpts, cancel: Option, + masquerade: Option, } impl TuicheInboundBuilder { @@ -127,9 +134,17 @@ impl TuicheInboundBuilder { private_key_path: None, opts: ConnectionOpts::default(), cancel: None, + masquerade: None, } } + /// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied to + /// the configured upstream site instead of being dropped. + pub fn masquerade(mut self, masquerade: Option) -> Self { + self.masquerade = masquerade; + self + } + /// Set the cancellation token driving graceful shutdown. Cancelling it /// stops the accept loop and closes every live connection. Defaults to a /// fresh token (i.e. the server only stops when the acceptor closes). @@ -198,6 +213,7 @@ impl TuicheInboundBuilder { private_key_path, cert_store, cancel: self.cancel.unwrap_or_default(), + masquerade: self.masquerade, }) } } diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index bdd7a4e..5b9d30f 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -69,6 +69,11 @@ pub struct TuicInboundOpts { /// connections (e.g. one TCP-over-QUIC stream per browser request) ramp out /// of slow-start faster instead of trickling the first few round trips. pub initial_window: u64, + + /// HTTP/3 masquerade. When `Some`, connections that aren't TUIC (their first + /// stream byte isn't `0x05`) are served as a reverse-proxy HTTP/3 web server + /// instead of being dropped. + pub masquerade: Option, } impl Default for TuicInboundOpts { @@ -92,6 +97,7 @@ impl Default for TuicInboundOpts { gso: true, congestion_control: CongestionControl::Bbr, initial_window: 1024 * 1024, + masquerade: None, } } } @@ -217,6 +223,7 @@ impl AbstractInbound for TuicInbound { let users = users.clone(); let auth_timeout = opts.auth_timeout; let zero_rtt = opts.zero_rtt; + let masquerade = opts.masquerade.clone(); let cb = cb.clone(); let conn_cancel = self.cancel.child_token(); let remote = incoming.remote_address(); @@ -227,7 +234,7 @@ impl AbstractInbound for TuicInbound { // `tasks.close()` + `tasks.wait()` after cancelling). self.ctx.tasks.spawn(spawn_logged( "Connection handler", - handle_connection(incoming, users, auth_timeout, zero_rtt, cb, conn_cancel), + handle_connection(incoming, users, auth_timeout, zero_rtt, masquerade, cb, conn_cancel), ).instrument(span)); } else => { @@ -255,6 +262,7 @@ async fn handle_connection( users: Arc>, auth_timeout: Duration, zero_rtt: bool, + masquerade: Option, callback: C, cancel: CancellationToken, ) -> eyre::Result<()> { @@ -295,7 +303,16 @@ async fn handle_connection( }; // Hand the established connection to the shared, backend-agnostic core. - crate::server::serve_connection(QuinnConnection::new(conn), remote_addr, users, auth_timeout, callback, cancel).await; + crate::server::serve_connection( + QuinnConnection::new(conn), + remote_addr, + users, + auth_timeout, + callback, + cancel, + masquerade, + ) + .await; Ok(()) } diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs new file mode 100644 index 0000000..491db77 --- /dev/null +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -0,0 +1,246 @@ +//! HTTP/3 masquerade: serve non-TUIC (real HTTP/3) clients as a reverse proxy. +//! +//! When the connection classifier in [`super`] decides a peer is speaking actual +//! HTTP/3 rather than TUIC (its first stream byte isn't the TUIC version `0x05`), +//! it hands the connection here. We run a real HTTP/3 server over the +//! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request to +//! a configured upstream site, relaying the response back. To an active prober +//! the server is indistinguishable from a normal HTTP/3 web server. + +use std::sync::{Arc, OnceLock}; + +use bytes::{Buf, Bytes, BytesMut}; +use http_body_util::{BodyExt as _, Full}; +use hyper_rustls::HttpsConnector; +use hyper_util::{ + client::legacy::{Client, connect::HttpConnector}, + rt::TokioExecutor, +}; +use tokio_util::sync::CancellationToken; +use tracing::debug; +use wind_quic::{ + QuicConnection, QuicRecvStream, + h3_adapter::{self, H3Conn}, +}; + +use super::MasqueradeConfig; + +/// Pooled HTTP/1.1(+TLS) client to the upstream site. +type HttpsClient = Client, Full>; + +/// Install the process-wide rustls crypto provider exactly once (mirrors +/// `wind_quic`'s `ensure_provider`). +fn ensure_provider() { + static INSTALLED: OnceLock<()> = OnceLock::new(); + INSTALLED.get_or_init(|| { + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + }); +} + +/// Build the upstream rustls config once (platform verifier + HTTP/1.1 ALPN). +fn build_tls() -> rustls::ClientConfig { + use rustls_platform_verifier::BuilderVerifierExt as _; + + let provider = rustls::crypto::CryptoProvider::get_default() + .expect("crypto provider installed by ensure_provider") + .clone(); + let mut cfg = rustls::ClientConfig::builder_with_provider(provider) + .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12]) + .expect("client protocol versions") + .with_platform_verifier() + .expect("client platform verifier") + .with_no_client_auth(); + cfg.alpn_protocols = vec![b"http/1.1".to_vec()]; + cfg +} + +/// A shared, lazily-built reverse-proxy client. The expensive part (loading the +/// platform root store) happens once; probers are rare, so a single global client +/// is plenty. +fn shared_client() -> HttpsClient { + static CLIENT: OnceLock = OnceLock::new(); + CLIENT + .get_or_init(|| { + ensure_provider(); + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(build_tls()) + .https_or_http() + .enable_http1() + .build(); + Client::builder(TokioExecutor::new()).build(https) + }) + .clone() +} + +/// The parsed upstream target (scheme + authority). +struct Upstream { + scheme: http::uri::Scheme, + authority: http::uri::Authority, +} + +impl Upstream { + fn parse(s: &str) -> eyre::Result { + let uri: http::Uri = s.parse().map_err(|e| eyre::eyre!("invalid masquerade upstream URI {s:?}: {e}"))?; + let scheme = uri.scheme().cloned().unwrap_or(http::uri::Scheme::HTTPS); + let authority = uri + .authority() + .cloned() + .ok_or_else(|| eyre::eyre!("masquerade upstream {s:?} has no host"))?; + Ok(Self { scheme, authority }) + } +} + +/// Run the HTTP/3 masquerade server over `conn`. `first_control` is the peer's +/// first stream (its peeked byte already replayed). Returns when the peer +/// disconnects or `cancel` fires. +pub async fn run_masquerade( + conn: C, + first_control: Box, + cfg: &MasqueradeConfig, + cancel: CancellationToken, +) -> eyre::Result<()> { + let upstream = Arc::new(Upstream::parse(&cfg.upstream)?); + let client = shared_client(); + + let adapter = h3_adapter::server_connection(conn, first_control); + let mut h3conn = h3::server::builder() + .build(adapter) + .await + .map_err(|e| eyre::eyre!("h3 server build failed: {e}"))?; + + loop { + let resolver = tokio::select! { + _ = cancel.cancelled() => break, + res = h3conn.accept() => match res { + Ok(Some(r)) => r, + Ok(None) => break, + Err(e) => { + debug!("masquerade accept ended: {e}"); + break; + } + }, + }; + + let up = upstream.clone(); + let cl = client.clone(); + tokio::spawn(async move { + if let Err(e) = handle_request::(resolver, up, cl).await { + debug!("masquerade request error: {e:?}"); + } + }); + } + + Ok(()) +} + +/// Resolve one HTTP/3 request, forward it to the upstream, and relay the +/// response back over the request stream. +async fn handle_request( + resolver: h3::server::RequestResolver, Bytes>, + upstream: Arc, + client: HttpsClient, +) -> eyre::Result<()> { + let (req, mut stream) = resolver + .resolve_request() + .await + .map_err(|e| eyre::eyre!("resolve_request: {e}"))?; + + // Drain the request body (usually empty for a probe's GET). + let mut body = BytesMut::new(); + while let Some(mut chunk) = stream.recv_data().await.map_err(|e| eyre::eyre!("recv_data: {e}"))? { + while chunk.has_remaining() { + let c = chunk.chunk(); + body.extend_from_slice(c); + let n = c.len(); + chunk.advance(n); + } + } + + let out_req = build_upstream_request(&req, body.freeze(), &upstream)?; + + match client.request(out_req).await { + Ok(resp) => { + let (parts, body) = resp.into_parts(); + let collected = body + .collect() + .await + .map_err(|e| eyre::eyre!("reading upstream body: {e}"))? + .to_bytes(); + + let mut builder = http::Response::builder().status(parts.status); + for (k, v) in parts.headers.iter() { + if is_hop_by_hop(k) { + continue; + } + builder = builder.header(k, v); + } + let response = builder.body(()).map_err(|e| eyre::eyre!("building h3 response: {e}"))?; + + stream + .send_response(response) + .await + .map_err(|e| eyre::eyre!("send_response: {e}"))?; + if !collected.is_empty() { + stream.send_data(collected).await.map_err(|e| eyre::eyre!("send_data: {e}"))?; + } + stream.finish().await.map_err(|e| eyre::eyre!("finish: {e}"))?; + } + Err(e) => { + // Upstream unreachable: still answer like a web server (502) rather + // than resetting, so the masquerade holds. + debug!("masquerade upstream request failed: {e}"); + let response = http::Response::builder() + .status(http::StatusCode::BAD_GATEWAY) + .body(()) + .expect("502 response is valid"); + let _ = stream.send_response(response).await; + let _ = stream.finish().await; + } + } + + Ok(()) +} + +/// Translate the incoming HTTP/3 request into an HTTP/1.1 request to the +/// upstream: keep method + path + most headers, but point it at the upstream +/// authority and rewrite `Host`. +fn build_upstream_request( + req: &http::Request<()>, + body: Bytes, + up: &Upstream, +) -> eyre::Result>> { + let pq = req.uri().path_and_query().map(|p| p.as_str()).unwrap_or("/"); + let uri = http::Uri::builder() + .scheme(up.scheme.clone()) + .authority(up.authority.clone()) + .path_and_query(pq) + .build() + .map_err(|e| eyre::eyre!("building upstream URI: {e}"))?; + + let mut builder = http::Request::builder().method(req.method()).uri(uri); + for (k, v) in req.headers().iter() { + if is_hop_by_hop(k) || k == http::header::HOST || k == http::header::CONTENT_LENGTH { + continue; + } + builder = builder.header(k, v); + } + builder = builder.header(http::header::HOST, up.authority.as_str()); + + builder + .body(Full::new(body)) + .map_err(|e| eyre::eyre!("building upstream request: {e}")) +} + +/// Hop-by-hop headers that must not be forwarded across a proxy (RFC 9110 §7.6.1) +/// plus framing headers HTTP/3 manages itself. +fn is_hop_by_hop(name: &http::header::HeaderName) -> bool { + use http::header; + *name == header::CONNECTION + || *name == header::TRANSFER_ENCODING + || *name == header::UPGRADE + || *name == header::TE + || *name == header::TRAILER + || *name == header::PROXY_AUTHENTICATE + || *name == header::PROXY_AUTHORIZATION + || name.as_str().eq_ignore_ascii_case("keep-alive") +} diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index f3f5535..20e39ff 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -31,6 +31,22 @@ use wind_quic::{QuicConnection, QuicError}; use crate::proto::{CmdType, Command, UdpStream}; +#[cfg(feature = "masquerade")] +mod masquerade; + +/// Configuration for the HTTP/3 masquerade. +/// +/// Kept dependency-free (just the upstream URL) so it threads through the +/// always-compiled [`serve_connection`] even when the `masquerade` feature is +/// off. The actual reverse-proxy engine lives in the feature-gated +/// [`masquerade`] module. +#[derive(Clone, Debug)] +pub struct MasqueradeConfig { + /// Upstream site to reverse-proxy non-TUIC HTTP/3 requests to, + /// e.g. `https://example.com`. + pub upstream: String, +} + async fn spawn_logged(label: &str, fut: impl std::future::Future>) { if let Err(err) = fut.await { error!("{label} error: {err:?}"); @@ -142,10 +158,63 @@ pub async fn serve_connection( auth_timeout: Duration, callback: CB, cancel: CancellationToken, + masq: Option, ) where C: QuicConnection, CB: InboundCallback, { + // --- Classify the connection: real TUIC vs HTTP/3 masquerade --- + // + // Both negotiate the `h3` ALPN, so the discriminator is the first byte of the + // first uni stream: every TUIC stream begins with the version byte `0x05` + // (`proto::VER`); a real HTTP/3 client's first uni stream is its control + // stream, beginning with the stream-type varint `0x00`. Peeking it here (and + // replaying it via `PrefixedRecv`) keeps both the TUIC parser and the h3 + // adapter able to read the stream from byte 0. + let first_uni = tokio::select! { + _ = cancel.cancelled() => return, + r = tokio::time::timeout(auth_timeout, conn.accept_uni()) => r, + }; + let mut first_uni = match first_uni { + Ok(Ok(s)) => s, + Ok(Err(e)) => { + tracing::debug!("connection from {} closed before first stream: {e:?}", remote_addr); + return; + } + Err(_) => { + warn!("connection from {} opened no stream within {:?}; closing", remote_addr, auth_timeout); + conn.close(0, b"timeout"); + return; + } + }; + + let mut first_byte = [0u8; 1]; + if let Err(e) = first_uni.read_exact(&mut first_byte).await { + tracing::debug!("connection from {} closed reading first byte: {e}", remote_addr); + return; + } + + if first_byte[0] != crate::proto::VER { + // Not TUIC → HTTP/3 masquerade (when enabled), otherwise drop. + #[cfg(feature = "masquerade")] + if let Some(cfg) = masq.as_ref() { + let prefixed = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&first_byte), first_uni); + info!("connection from {} is not TUIC; serving HTTP/3 masquerade", remote_addr); + if let Err(e) = masquerade::run_masquerade(conn, Box::new(prefixed), cfg, cancel).await { + tracing::debug!("masquerade for {} ended: {e:?}", remote_addr); + } + return; + } + let _ = &masq; // used only by the masquerade branch; silence unused warning + tracing::debug!("connection from {} is not TUIC and masquerade is disabled; closing", remote_addr); + conn.close(0, b""); + return; + } + + // TUIC: re-wrap the first uni stream so the peeked version byte is replayed to + // the Auth parser. + let first_uni = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&first_byte), first_uni); + let udp_root_cancel = cancel.child_token(); // Eviction listener fires for both explicit `remove()` (via Dissociate) and @@ -288,6 +357,18 @@ pub async fn serve_connection( ); } + // Process the first uni stream already accepted during classification — for + // TUIC this is the Auth stream (with its peeked version byte replayed). + // Subsequent uni streams are handled by the acceptor loop above. + { + let conn = connection.clone(); + let cb = callback.clone(); + tokio::spawn( + spawn_logged("Uni stream", handle_uni_stream(conn, first_uni, cb)) + .instrument(tracing::debug_span!("uni_stream")), + ); + } + // Exit on either server shutdown or peer disconnect. tokio::select! { _ = cancel.cancelled() => { @@ -303,7 +384,7 @@ pub async fn serve_connection( async fn handle_uni_stream( ctx: Arc>, - mut recv: C::RecvStream, + mut recv: impl AsyncRead + Unpin + Send + 'static, callback: CB, ) -> eyre::Result<()> { let mut header_buf = [0u8; 2]; From ba48382405fdf907eb831b03d270cfa50a815876 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 05:50:09 +0800 Subject: [PATCH 02/10] refactor(wind-tuic): reverse-proxy via reqwest + h3 masquerade e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Studied Itsusinn/tuic's `camouflage.rs`, which reverse-proxies with `reqwest` rather than a hand-rolled HTTP client. Apply the same simplification and add an end-to-end test that probes the masquerade with reqwest's HTTP/3 client. Simplify the masquerade reverse proxy: - Replace the hand-rolled hyper + hyper-rustls + rustls-platform-verifier client (ensure_provider / build_tls / shared_client / manual request building) with a single shared `reqwest::Client`; reqwest owns TLS, roots, pooling and ALPN. - Drop the hyper/hyper-util/hyper-rustls/http-body-util deps from the `masquerade` feature (reqwest was already in the tree). - Add robustness borrowed from upstream: stream the response body chunk-by-chunk (instead of buffering it whole) and cap request/response bodies (16 MB / 64 MB). - Keep the backend-agnostic `h3::quic` adapter (works over quinn and quiche), rather than the quinn-only adapter upstream uses. Add the end-to-end test (tuic-tests/tests/masquerade.rs): - A trivial HTTP/1.1 upstream + the quinn tuic-server with masquerade enabled. - reqwest's experimental HTTP/3 client (`http3_prior_knowledge`) sends a real HTTP/3 GET and asserts it gets back the upstream's 200 body over HTTP/3 — proving a non-TUIC prober is reverse-proxied, not reset. - Gated behind the opt-in `h3-masquerade-test` feature + the `--cfg reqwest_unstable` rustc flag (passed via RUSTFLAGS); without them the test is cfg'd out, so default builds pull no http3 stack and are unaffected. Run: RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features h3-masquerade-test Verified: the e2e test passes; default builds + clippy stay clean. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 41 ++- crates/tuic-tests/Cargo.toml | 18 ++ crates/tuic-tests/tests/masquerade.rs | 117 +++++++++ crates/wind-tuic/Cargo.toml | 19 +- crates/wind-tuic/src/server/masquerade.rs | 295 +++++++++------------- 5 files changed, 302 insertions(+), 188 deletions(-) create mode 100644 crates/tuic-tests/tests/masquerade.rs diff --git a/Cargo.lock b/Cargo.lock index a00b637..bec4cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1829,6 +1829,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "h3-quinn" +version = "0.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2e732c8d91a74731663ac8479ab505042fbf547b9a207213ab7fbcbfc4f8b4" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn 0.11.9", + "tokio", + "tokio-util", +] + [[package]] name = "half" version = "2.7.1" @@ -3416,6 +3430,7 @@ checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" dependencies = [ "bytes", "cfg_aliases", + "futures-io", "pin-project-lite", "quinn-proto 0.11.14", "quinn-udp 0.5.14", @@ -3768,6 +3783,10 @@ dependencies = [ "base64", "bytes", "futures-core", + "futures-util", + "h2", + "h3", + "h3-quinn", "http", "http-body", "http-body-util", @@ -3787,12 +3806,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.7", ] @@ -5364,10 +5385,12 @@ dependencies = [ "bytes", "eyre", "fast-socks5", + "http", "json5 1.3.1", "lexopt", "quinn 0.12.0", "rcgen 0.14.8", + "reqwest", "rustls", "serial_test", "tokio", @@ -5619,6 +5642,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" @@ -5906,16 +5942,13 @@ dependencies = [ "futures-util", "h3", "http", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-util", "moka", "pin-project", "portable-atomic", "quinn 0.12.0", "quinn-congestions", "rcgen 0.14.8", + "reqwest", "rustls", "rustls-platform-verifier", "secrecy", diff --git a/crates/tuic-tests/Cargo.toml b/crates/tuic-tests/Cargo.toml index fed9475..453b095 100644 --- a/crates/tuic-tests/Cargo.toml +++ b/crates/tuic-tests/Cargo.toml @@ -13,6 +13,14 @@ default = ["aws-lc-rs"] ring = ["wind-tuic/ring", "rustls/ring", "tuic-client/ring", "tuic-server/ring"] aws-lc-rs = ["wind-tuic/aws-lc-rs", "dep:aws-lc-rs", "rustls/aws-lc-rs", "tuic-client/aws-lc-rs", "tuic-server/aws-lc-rs"] +# End-to-end HTTP/3 masquerade test (tests/masquerade.rs). Opt-in because it +# pulls reqwest's experimental HTTP/3 client (a second quinn/h3 stack) and needs +# the `--cfg reqwest_unstable` rustc flag. Run with: +# RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features h3-masquerade-test +# (without the flag the test is cfg'd out — it simply doesn't run, never breaks +# the build.) +h3-masquerade-test = ["wind-tuic/masquerade", "dep:reqwest", "dep:http"] + [dependencies] wind-core = { version = "0.1.1", path = "../wind-core" } tuic-core = { path = "../tuic-core" } @@ -34,6 +42,16 @@ aws-lc-rs = { version = "1", default-features = false, optional = true, features uuid = "1" tokio-util = { version = "0.7", features = ["codec"] } +# HTTP/3 client for the masquerade e2e test (gated behind `h3-masquerade-test`). +reqwest = { version = "0.12", default-features = false, features = ["http3", "rustls-tls"], optional = true } +http = { version = "1", optional = true } + +# `reqwest_unstable` is a rustc cfg (passed via RUSTFLAGS for the +# `h3-masquerade-test`) that gates reqwest's HTTP/3 client and the masquerade +# test; declare it so it isn't flagged as an unknown cfg. +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(reqwest_unstable)'] } + # The quiche e2e tests only *run* on 64-bit (cross-emulated 32-bit test execution # is unreliable for real sockets). On 64-bit we enable tuic-server's `quiche` # feature (so the tests exercise it), add wind-tuic's `quiche` feature (for the diff --git a/crates/tuic-tests/tests/masquerade.rs b/crates/tuic-tests/tests/masquerade.rs new file mode 100644 index 0000000..033b76e --- /dev/null +++ b/crates/tuic-tests/tests/masquerade.rs @@ -0,0 +1,117 @@ +//! End-to-end test for the HTTP/3 masquerade, using **reqwest's HTTP/3 client** +//! as the "prober". +//! +//! A real HTTP/3 GET against the (quinn) `tuic-server` must come back as the +//! reverse-proxied upstream response — proving a non-TUIC client is served like +//! a genuine web server rather than reset. Exercises the whole path: QUIC +//! handshake, first-byte classification (`0x05` vs not), the `h3::quic` adapter, +//! the `h3` server, and the reqwest reverse proxy to the upstream. +//! +//! Opt-in (pulls reqwest's experimental HTTP/3 stack + needs the `--cfg +//! reqwest_unstable` rustc flag; without it the test is cfg'd out): +//! RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features h3-masquerade-test +#![cfg(all(feature = "h3-masquerade-test", reqwest_unstable, target_pointer_width = "64"))] + +use std::{collections::HashMap, net::SocketAddr, time::Duration}; + +use tokio::{ + io::{AsyncReadExt as _, AsyncWriteExt as _}, + net::TcpListener, + time::timeout, +}; +use tuic_tests::install_crypto_provider; +use uuid::Uuid; + +const UPSTREAM_BODY: &str = "wind masquerade upstream OK"; + +/// A trivial HTTP/1.1 upstream: answers every request with a fixed 200 body. +/// This is what the masquerade reverse-proxies to. +async fn start_upstream() -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + while let Ok((mut sock, _)) = listener.accept().await { + tokio::spawn(async move { + // A probe GET has no body, so a single read drains the request line + // + headers; we don't need to parse it. + let mut buf = [0u8; 8192]; + let _ = sock.read(&mut buf).await; + let resp = format!( + "HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + UPSTREAM_BODY.len(), + UPSTREAM_BODY + ); + let _ = sock.write_all(resp.as_bytes()).await; + let _ = sock.shutdown().await; + }); + } + }); + addr +} + +#[tokio::test(flavor = "multi_thread")] +async fn masquerade_reverse_proxies_http3_probes() -> eyre::Result<()> { + install_crypto_provider(); + + let upstream = start_upstream().await; + + // Fixed port (matches the repo's other e2e tests) so we can form the client + // URL before the server reports its address. + let server_addr: SocketAddr = "127.0.0.1:8471".parse().unwrap(); + let uuid = Uuid::new_v4(); + + let cfg = tuic_server::Config { + log_level: tuic_server::config::LogLevel::Debug, + server: server_addr, + users: { + let mut users = HashMap::new(); + users.insert(uuid, "pw".to_string()); + users + }, + tls: tuic_server::config::TlsConfig { + self_sign: true, + hostname: "localhost".to_string(), + // The server must advertise the `h3` ALPN for an HTTP/3 client to + // negotiate at all — this is also what TUIC uses to disguise itself. + alpn: vec!["h3".to_string()], + ..Default::default() + }, + masquerade: tuic_server::config::MasqueradeConfig { + enabled: true, + upstream: format!("http://{upstream}"), + }, + data_dir: std::env::temp_dir().join("wind-masquerade-test"), + experimental: tuic_server::config::ExperimentalConfig { + drop_loopback: false, + drop_private: false, + }, + ..Default::default() + }; + + tokio::spawn(async move { + let _ = timeout(Duration::from_secs(20), tuic_server::run(cfg)).await; + }); + tokio::time::sleep(Duration::from_secs(1)).await; + + // reqwest as a real HTTP/3 prober. `danger_accept_invalid_certs` because the + // server uses a self-signed cert; `http3_prior_knowledge` forces h3. + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .http3_prior_knowledge() + .build()?; + + let url = format!("https://{server_addr}/some/secret/path?probe=1"); + let res = timeout( + Duration::from_secs(10), + client.get(&url).version(http::Version::HTTP_3).send(), + ) + .await + .map_err(|_| eyre::eyre!("HTTP/3 request to the masquerade timed out"))??; + + assert_eq!(res.version(), http::Version::HTTP_3, "response must be HTTP/3"); + assert_eq!(res.status(), 200, "masquerade should return the upstream's 200"); + let body = res.text().await?; + assert_eq!(body, UPSTREAM_BODY, "masquerade must relay the upstream body"); + + Ok(()) +} diff --git a/crates/wind-tuic/Cargo.toml b/crates/wind-tuic/Cargo.toml index 8bef7ff..ba0191b 100644 --- a/crates/wind-tuic/Cargo.toml +++ b/crates/wind-tuic/Cargo.toml @@ -24,14 +24,8 @@ masquerade = [ "server", "wind-quic/h3", "dep:h3", - "dep:hyper", - "dep:hyper-util", - "dep:hyper-rustls", "dep:http", - "dep:http-body-util", - "dep:rustls", - "rustls/tls12", - "dep:rustls-platform-verifier", + "dep:reqwest", ] # Quinn Backend @@ -97,15 +91,12 @@ rustls = { version = "0.23", default-features = false, optional = true } rustls-platform-verifier = { version = "0.7", default-features = false, optional = true } aws-lc-rs = { version = "*", optional = true, default-features = false } -# HTTP/3 masquerade: the transport-agnostic h3 server engine plus a hyper-based -# reverse-proxy client to the upstream site. All gated behind the `masquerade` -# feature. +# HTTP/3 masquerade: the transport-agnostic h3 server engine plus a `reqwest` +# reverse-proxy client to the upstream site (reqwest owns TLS/roots/pooling, so +# no hyper/rustls plumbing here). Gated behind the `masquerade` feature. h3 = { version = "0.0.8", optional = true } -hyper = { version = "1", default-features = false, features = ["client", "http1"], optional = true } -hyper-util = { version = "0.1", default-features = false, features = ["client-legacy", "http1", "tokio"], optional = true } -hyper-rustls = { version = "0.27", default-features = false, features = ["http1"], optional = true } http = { version = "1", optional = true } -http-body-util = { version = "0.1", optional = true } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream", "http2"], optional = true } [dev-dependencies] tokio = { version = "1", features = ["full", "test-util", "macros", "rt-multi-thread", "io-util"] } diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index 491db77..227e832 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -4,18 +4,16 @@ //! HTTP/3 rather than TUIC (its first stream byte isn't the TUIC version `0x05`), //! it hands the connection here. We run a real HTTP/3 server over the //! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request to -//! a configured upstream site, relaying the response back. To an active prober -//! the server is indistinguishable from a normal HTTP/3 web server. +//! a configured upstream site with a `reqwest` client, relaying the response +//! back. To an active prober the server is indistinguishable from a normal +//! HTTP/3 web server. -use std::sync::{Arc, OnceLock}; +use std::{sync::OnceLock, time::Duration}; -use bytes::{Buf, Bytes, BytesMut}; -use http_body_util::{BodyExt as _, Full}; -use hyper_rustls::HttpsConnector; -use hyper_util::{ - client::legacy::{Client, connect::HttpConnector}, - rt::TokioExecutor, -}; +use bytes::{Buf as _, Bytes}; +use http::header::HeaderName; +use reqwest::{Client, Url}; +use tokio_stream::StreamExt as _; use tokio_util::sync::CancellationToken; use tracing::debug; use wind_quic::{ @@ -25,71 +23,28 @@ use wind_quic::{ use super::MasqueradeConfig; -/// Pooled HTTP/1.1(+TLS) client to the upstream site. -type HttpsClient = Client, Full>; - -/// Install the process-wide rustls crypto provider exactly once (mirrors -/// `wind_quic`'s `ensure_provider`). -fn ensure_provider() { - static INSTALLED: OnceLock<()> = OnceLock::new(); - INSTALLED.get_or_init(|| { - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - }); -} - -/// Build the upstream rustls config once (platform verifier + HTTP/1.1 ALPN). -fn build_tls() -> rustls::ClientConfig { - use rustls_platform_verifier::BuilderVerifierExt as _; - - let provider = rustls::crypto::CryptoProvider::get_default() - .expect("crypto provider installed by ensure_provider") - .clone(); - let mut cfg = rustls::ClientConfig::builder_with_provider(provider) - .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12]) - .expect("client protocol versions") - .with_platform_verifier() - .expect("client platform verifier") - .with_no_client_auth(); - cfg.alpn_protocols = vec![b"http/1.1".to_vec()]; - cfg -} - -/// A shared, lazily-built reverse-proxy client. The expensive part (loading the -/// platform root store) happens once; probers are rare, so a single global client -/// is plenty. -fn shared_client() -> HttpsClient { - static CLIENT: OnceLock = OnceLock::new(); +/// Cap on a single proxied request body (bounds per-request memory). +const MAX_REQUEST_BODY_SIZE: usize = 16 * 1024 * 1024; +/// Cap on a single proxied response body. +const MAX_RESPONSE_BODY_SIZE: usize = 64 * 1024 * 1024; +/// Upstream request timeout. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// The shared reverse-proxy client. Upstream-independent (it dials per request +/// URL), built once — probers are rare. `reqwest` owns TLS, roots, pooling and +/// ALPN, so there is no rustls/provider plumbing here. +fn client() -> Client { + static CLIENT: OnceLock = OnceLock::new(); CLIENT .get_or_init(|| { - ensure_provider(); - let https = hyper_rustls::HttpsConnectorBuilder::new() - .with_tls_config(build_tls()) - .https_or_http() - .enable_http1() - .build(); - Client::builder(TokioExecutor::new()).build(https) + Client::builder() + .timeout(REQUEST_TIMEOUT) + .build() + .expect("building the masquerade reqwest client") }) .clone() } -/// The parsed upstream target (scheme + authority). -struct Upstream { - scheme: http::uri::Scheme, - authority: http::uri::Authority, -} - -impl Upstream { - fn parse(s: &str) -> eyre::Result { - let uri: http::Uri = s.parse().map_err(|e| eyre::eyre!("invalid masquerade upstream URI {s:?}: {e}"))?; - let scheme = uri.scheme().cloned().unwrap_or(http::uri::Scheme::HTTPS); - let authority = uri - .authority() - .cloned() - .ok_or_else(|| eyre::eyre!("masquerade upstream {s:?} has no host"))?; - Ok(Self { scheme, authority }) - } -} - /// Run the HTTP/3 masquerade server over `conn`. `first_control` is the peer's /// first stream (its peeked byte already replayed). Returns when the peer /// disconnects or `cancel` fires. @@ -99,14 +54,13 @@ pub async fn run_masquerade( cfg: &MasqueradeConfig, cancel: CancellationToken, ) -> eyre::Result<()> { - let upstream = Arc::new(Upstream::parse(&cfg.upstream)?); - let client = shared_client(); + let backend = Url::parse(&cfg.upstream).map_err(|e| eyre::eyre!("invalid masquerade upstream {:?}: {e}", cfg.upstream))?; + let client = client(); let adapter = h3_adapter::server_connection(conn, first_control); - let mut h3conn = h3::server::builder() - .build(adapter) + let mut h3conn = h3::server::Connection::new(adapter) .await - .map_err(|e| eyre::eyre!("h3 server build failed: {e}"))?; + .map_err(|e| eyre::eyre!("h3 server setup failed: {e}"))?; loop { let resolver = tokio::select! { @@ -121,10 +75,10 @@ pub async fn run_masquerade( }, }; - let up = upstream.clone(); - let cl = client.clone(); + let backend = backend.clone(); + let client = client.clone(); tokio::spawn(async move { - if let Err(e) = handle_request::(resolver, up, cl).await { + if let Err(e) = handle_request::(resolver, &client, &backend).await { debug!("masquerade request error: {e:?}"); } }); @@ -133,114 +87,115 @@ pub async fn run_masquerade( Ok(()) } -/// Resolve one HTTP/3 request, forward it to the upstream, and relay the -/// response back over the request stream. +/// Resolve one HTTP/3 request and reverse-proxy it; on any failure answer `502` +/// like a real web server rather than resetting the stream. async fn handle_request( resolver: h3::server::RequestResolver, Bytes>, - upstream: Arc, - client: HttpsClient, + client: &Client, + backend: &Url, ) -> eyre::Result<()> { - let (req, mut stream) = resolver - .resolve_request() - .await - .map_err(|e| eyre::eyre!("resolve_request: {e}"))?; - - // Drain the request body (usually empty for a probe's GET). - let mut body = BytesMut::new(); - while let Some(mut chunk) = stream.recv_data().await.map_err(|e| eyre::eyre!("recv_data: {e}"))? { - while chunk.has_remaining() { - let c = chunk.chunk(); - body.extend_from_slice(c); - let n = c.len(); - chunk.advance(n); + let (request, mut stream) = resolver.resolve_request().await?; + + if let Err(e) = forward(client, backend, request, &mut stream).await { + debug!("masquerade upstream failed: {e}"); + let resp = http::Response::builder() + .status(http::StatusCode::BAD_GATEWAY) + .body(()) + .expect("502 response is valid"); + let _ = stream.send_response(resp).await; + let _ = stream.finish().await; + } + Ok(()) +} + +/// Translate the HTTP/3 request into a `reqwest` call to `backend`, stream the +/// response body back (size-capped), and finish the stream. +async fn forward( + client: &Client, + backend: &Url, + request: http::Request<()>, + stream: &mut h3::server::RequestStream, +) -> eyre::Result<()> +where + S: h3::quic::BidiStream, +{ + let target = rewrite_target(backend, request.uri())?; + let mut req = client.request(request.method().clone(), target); + for (name, value) in request.headers() { + if is_forwardable(name) { + req = req.header(name, value); } } - let out_req = build_upstream_request(&req, body.freeze(), &upstream)?; - - match client.request(out_req).await { - Ok(resp) => { - let (parts, body) = resp.into_parts(); - let collected = body - .collect() - .await - .map_err(|e| eyre::eyre!("reading upstream body: {e}"))? - .to_bytes(); - - let mut builder = http::Response::builder().status(parts.status); - for (k, v) in parts.headers.iter() { - if is_hop_by_hop(k) { - continue; - } - builder = builder.header(k, v); - } - let response = builder.body(()).map_err(|e| eyre::eyre!("building h3 response: {e}"))?; - - stream - .send_response(response) - .await - .map_err(|e| eyre::eyre!("send_response: {e}"))?; - if !collected.is_empty() { - stream.send_data(collected).await.map_err(|e| eyre::eyre!("send_data: {e}"))?; - } - stream.finish().await.map_err(|e| eyre::eyre!("finish: {e}"))?; + let body = read_request_body(stream).await?; + if !body.is_empty() { + req = req.body(body); + } + + let resp = req.send().await?; + + let mut builder = http::Response::builder().status(resp.status()); + for (name, value) in resp.headers() { + if is_forwardable(name) { + builder = builder.header(name, value); + } + } + stream.send_response(builder.body(())?).await?; + + let mut sent = 0usize; + let mut body_stream = resp.bytes_stream(); + while let Some(chunk) = body_stream.next().await { + let chunk = chunk?; + sent += chunk.len(); + if sent > MAX_RESPONSE_BODY_SIZE { + return Err(eyre::eyre!("upstream response body exceeds {MAX_RESPONSE_BODY_SIZE} bytes")); } - Err(e) => { - // Upstream unreachable: still answer like a web server (502) rather - // than resetting, so the masquerade holds. - debug!("masquerade upstream request failed: {e}"); - let response = http::Response::builder() - .status(http::StatusCode::BAD_GATEWAY) - .body(()) - .expect("502 response is valid"); - let _ = stream.send_response(response).await; - let _ = stream.finish().await; + if !chunk.is_empty() { + stream.send_data(chunk).await?; } } - + stream.finish().await?; Ok(()) } -/// Translate the incoming HTTP/3 request into an HTTP/1.1 request to the -/// upstream: keep method + path + most headers, but point it at the upstream -/// authority and rewrite `Host`. -fn build_upstream_request( - req: &http::Request<()>, - body: Bytes, - up: &Upstream, -) -> eyre::Result>> { - let pq = req.uri().path_and_query().map(|p| p.as_str()).unwrap_or("/"); - let uri = http::Uri::builder() - .scheme(up.scheme.clone()) - .authority(up.authority.clone()) - .path_and_query(pq) - .build() - .map_err(|e| eyre::eyre!("building upstream URI: {e}"))?; - - let mut builder = http::Request::builder().method(req.method()).uri(uri); - for (k, v) in req.headers().iter() { - if is_hop_by_hop(k) || k == http::header::HOST || k == http::header::CONTENT_LENGTH { - continue; +/// Drain the HTTP/3 request body (usually empty for a probe's GET), capped. +async fn read_request_body(stream: &mut h3::server::RequestStream) -> eyre::Result +where + S: h3::quic::BidiStream, +{ + let mut body = Vec::new(); + while let Some(mut chunk) = stream.recv_data().await? { + let n = chunk.remaining(); + body.extend_from_slice(chunk.copy_to_bytes(n).as_ref()); + if body.len() > MAX_REQUEST_BODY_SIZE { + return Err(eyre::eyre!("request body exceeds {MAX_REQUEST_BODY_SIZE} bytes")); } - builder = builder.header(k, v); } - builder = builder.header(http::header::HOST, up.authority.as_str()); + let _ = stream.recv_trailers().await?; + Ok(Bytes::from(body)) +} - builder - .body(Full::new(body)) - .map_err(|e| eyre::eyre!("building upstream request: {e}")) +/// Point the request at the backend: keep the backend's scheme/host/port, append +/// the incoming path and query. +fn rewrite_target(backend: &Url, uri: &http::Uri) -> eyre::Result { + let path_and_query = uri.path_and_query().map(|v| v.as_str()).unwrap_or("/"); + let mut target = backend.clone(); + target.set_path(""); + target.set_query(None); + Ok(target.join(path_and_query)?) } -/// Hop-by-hop headers that must not be forwarded across a proxy (RFC 9110 §7.6.1) -/// plus framing headers HTTP/3 manages itself. -fn is_hop_by_hop(name: &http::header::HeaderName) -> bool { - use http::header; - *name == header::CONNECTION - || *name == header::TRANSFER_ENCODING - || *name == header::UPGRADE - || *name == header::TE - || *name == header::TRAILER - || *name == header::PROXY_AUTHENTICATE - || *name == header::PROXY_AUTHORIZATION - || name.as_str().eq_ignore_ascii_case("keep-alive") +/// Whether a header may cross the proxy: drops hop-by-hop headers (RFC 9110 +/// §7.6.1) plus framing headers each side manages itself. +fn is_forwardable(name: &HeaderName) -> bool { + !matches!( + name.as_str().to_ascii_lowercase().as_str(), + "connection" + | "keep-alive" + | "proxy-connection" + | "transfer-encoding" + | "upgrade" | "te" + | "trailer" | "host" + | "content-length" + ) } From e62114dd365c09eabbca27f6187ce2f7fda6745f Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 06:22:27 +0800 Subject: [PATCH 03/10] style: cargo fmt (nightly rustfmt.toml options) The masquerade code was written without running the repo's formatter; apply `cargo +nightly fmt` so it matches CI's stable + RUSTC_BOOTSTRAP=1 fmt check (which honors the unstable rustfmt.toml options). No functional change. Co-Authored-By: Claude Opus 4.8 --- crates/tuic-server/src/config.rs | 13 +++++++------ crates/tuic-tests/tests/masquerade.rs | 7 ++++--- crates/wind-quic/src/h3_adapter.rs | 21 ++++++++++----------- crates/wind-quic/src/prefixed.rs | 4 ++-- crates/wind-tuic/src/quiche/inbound.rs | 8 ++++---- crates/wind-tuic/src/quinn/inbound.rs | 6 +++--- crates/wind-tuic/src/server/masquerade.rs | 20 ++++++++++---------- crates/wind-tuic/src/server/mod.rs | 13 +++++++++---- 8 files changed, 49 insertions(+), 43 deletions(-) diff --git a/crates/tuic-server/src/config.rs b/crates/tuic-server/src/config.rs index 5c76fe6..4118345 100644 --- a/crates/tuic-server/src/config.rs +++ b/crates/tuic-server/src/config.rs @@ -87,8 +87,9 @@ pub struct Config { pub users: HashMap, pub tls: TlsConfig, - /// HTTP/3 masquerade: reverse-proxy non-TUIC (HTTP/3 probe) connections to a - /// real upstream site so the server is indistinguishable from a web server. + /// HTTP/3 masquerade: reverse-proxy non-TUIC (HTTP/3 probe) connections to + /// a real upstream site so the server is indistinguishable from a web + /// server. #[serde(default)] pub masquerade: MasqueradeConfig, @@ -264,10 +265,10 @@ pub struct TlsConfig { /// HTTP/3 masquerade configuration. /// -/// When `enabled`, a connection that isn't TUIC (its first stream byte isn't the -/// TUIC version `0x05` — i.e. an active prober speaking real HTTP/3) is served as -/// a reverse proxy to `upstream`, so the server is indistinguishable from a -/// normal HTTP/3 website instead of resetting the connection. +/// When `enabled`, a connection that isn't TUIC (its first stream byte isn't +/// the TUIC version `0x05` — i.e. an active prober speaking real HTTP/3) is +/// served as a reverse proxy to `upstream`, so the server is indistinguishable +/// from a normal HTTP/3 website instead of resetting the connection. #[derive(Deserialize, Serialize, Educe)] #[educe(Default)] #[serde(default, deny_unknown_fields)] diff --git a/crates/tuic-tests/tests/masquerade.rs b/crates/tuic-tests/tests/masquerade.rs index 033b76e..121f3fc 100644 --- a/crates/tuic-tests/tests/masquerade.rs +++ b/crates/tuic-tests/tests/masquerade.rs @@ -4,12 +4,13 @@ //! A real HTTP/3 GET against the (quinn) `tuic-server` must come back as the //! reverse-proxied upstream response — proving a non-TUIC client is served like //! a genuine web server rather than reset. Exercises the whole path: QUIC -//! handshake, first-byte classification (`0x05` vs not), the `h3::quic` adapter, -//! the `h3` server, and the reqwest reverse proxy to the upstream. +//! handshake, first-byte classification (`0x05` vs not), the `h3::quic` +//! adapter, the `h3` server, and the reqwest reverse proxy to the upstream. //! //! Opt-in (pulls reqwest's experimental HTTP/3 stack + needs the `--cfg //! reqwest_unstable` rustc flag; without it the test is cfg'd out): -//! RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features h3-masquerade-test +//! RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features +//! h3-masquerade-test #![cfg(all(feature = "h3-masquerade-test", reqwest_unstable, target_pointer_width = "64"))] use std::{collections::HashMap, net::SocketAddr, time::Duration}; diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs index e2d2d1c..3412871 100644 --- a/crates/wind-quic/src/h3_adapter.rs +++ b/crates/wind-quic/src/h3_adapter.rs @@ -8,17 +8,18 @@ //! quiche backend. //! //! Only the **server** surface is implemented: accepting peer-initiated uni -//! (control / QPACK) and bidi (request) streams, and opening our own uni streams -//! (control / QPACK). The classifier in `wind-tuic` peeks the first byte of the -//! peer's control stream; that consumed byte is replayed via +//! (control / QPACK) and bidi (request) streams, and opening our own uni +//! streams (control / QPACK). The classifier in `wind-tuic` peeks the first +//! byte of the peer's control stream; that consumed byte is replayed via //! [`PrefixedRecv`](crate::PrefixedRecv) and the (boxed) stream is handed to //! [`server_connection`] as the first stream the adapter yields. //! //! The bridge is mechanical: our streams are `AsyncRead`/`AsyncWrite`, while //! `h3::quic` is poll- and `Buf`-based. Recv streams read into a scratch buffer -//! and hand back `Bytes`; send streams buffer one `WriteBuf` and drain it through -//! `poll_write`. Our `QuicConnection` accept/open methods are `async fn`, so each -//! is driven as a boxed in-flight future stored on the connection/opener. +//! and hand back `Bytes`; send streams buffer one `WriteBuf` and drain it +//! through `poll_write`. Our `QuicConnection` accept/open methods are `async +//! fn`, so each is driven as a boxed in-flight future stored on the +//! connection/opener. use std::{ future::Future, @@ -257,8 +258,8 @@ impl RecvStream for H3Bidi { } impl BidiStream for H3Bidi { - type SendStream = H3Send; type RecvStream = H3Recv; + type SendStream = H3Send; fn split(self) -> (Self::SendStream, Self::RecvStream) { (self.send, self.recv) @@ -326,8 +327,8 @@ impl OpenStreams for H3Conn { } impl Connection for H3Conn { - type RecvStream = H3Recv; type OpenStreams = H3Opener; + type RecvStream = H3Recv; fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(recv) = self.first_recv.take() { @@ -335,9 +336,7 @@ impl Connection for H3Conn { } let poll = { let conn = &self.conn; - let fut = self - .accept_recv_fut - .get_or_insert_with(|| boxed_accept_uni(conn.clone())); + let fut = self.accept_recv_fut.get_or_insert_with(|| boxed_accept_uni(conn.clone())); fut.as_mut().poll(cx) }; match poll { diff --git a/crates/wind-quic/src/prefixed.rs b/crates/wind-quic/src/prefixed.rs index 20d4b0f..a6214e0 100644 --- a/crates/wind-quic/src/prefixed.rs +++ b/crates/wind-quic/src/prefixed.rs @@ -18,8 +18,8 @@ use tokio::io::{AsyncRead, ReadBuf}; use crate::traits::QuicRecvStream; /// Wraps a recv stream so a buffered `prefix` is yielded before the inner -/// stream's own data. Once the prefix is drained, reads delegate straight to the -/// inner stream. +/// stream's own data. Once the prefix is drained, reads delegate straight to +/// the inner stream. pub struct PrefixedRecv { prefix: Bytes, inner: R, diff --git a/crates/wind-tuic/src/quiche/inbound.rs b/crates/wind-tuic/src/quiche/inbound.rs index 1615594..e938c1b 100644 --- a/crates/wind-tuic/src/quiche/inbound.rs +++ b/crates/wind-tuic/src/quiche/inbound.rs @@ -40,8 +40,8 @@ pub struct TuicheInbound { /// Root cancellation token: cancelling it stops the accept loop and tears /// down every live connection (each gets a child token). cancel: CancellationToken, - /// HTTP/3 masquerade config; when `Some`, non-TUIC connections are served as - /// a reverse-proxy HTTP/3 web server. + /// HTTP/3 masquerade config; when `Some`, non-TUIC connections are served + /// as a reverse-proxy HTTP/3 web server. masquerade: Option, } @@ -138,8 +138,8 @@ impl TuicheInboundBuilder { } } - /// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied to - /// the configured upstream site instead of being dropped. + /// Enable the HTTP/3 masquerade: non-TUIC connections are reverse-proxied + /// to the configured upstream site instead of being dropped. pub fn masquerade(mut self, masquerade: Option) -> Self { self.masquerade = masquerade; self diff --git a/crates/wind-tuic/src/quinn/inbound.rs b/crates/wind-tuic/src/quinn/inbound.rs index 5b9d30f..6b217fb 100644 --- a/crates/wind-tuic/src/quinn/inbound.rs +++ b/crates/wind-tuic/src/quinn/inbound.rs @@ -70,9 +70,9 @@ pub struct TuicInboundOpts { /// of slow-start faster instead of trickling the first few round trips. pub initial_window: u64, - /// HTTP/3 masquerade. When `Some`, connections that aren't TUIC (their first - /// stream byte isn't `0x05`) are served as a reverse-proxy HTTP/3 web server - /// instead of being dropped. + /// HTTP/3 masquerade. When `Some`, connections that aren't TUIC (their + /// first stream byte isn't `0x05`) are served as a reverse-proxy HTTP/3 + /// web server instead of being dropped. pub masquerade: Option, } diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index 227e832..4293121 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -1,10 +1,10 @@ //! HTTP/3 masquerade: serve non-TUIC (real HTTP/3) clients as a reverse proxy. //! -//! When the connection classifier in [`super`] decides a peer is speaking actual -//! HTTP/3 rather than TUIC (its first stream byte isn't the TUIC version `0x05`), -//! it hands the connection here. We run a real HTTP/3 server over the -//! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request to -//! a configured upstream site with a `reqwest` client, relaying the response +//! When the connection classifier in [`super`] decides a peer is speaking +//! actual HTTP/3 rather than TUIC (its first stream byte isn't the TUIC version +//! `0x05`), it hands the connection here. We run a real HTTP/3 server over the +//! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request +//! to a configured upstream site with a `reqwest` client, relaying the response //! back. To an active prober the server is indistinguishable from a normal //! HTTP/3 web server. @@ -175,8 +175,8 @@ where Ok(Bytes::from(body)) } -/// Point the request at the backend: keep the backend's scheme/host/port, append -/// the incoming path and query. +/// Point the request at the backend: keep the backend's scheme/host/port, +/// append the incoming path and query. fn rewrite_target(backend: &Url, uri: &http::Uri) -> eyre::Result { let path_and_query = uri.path_and_query().map(|v| v.as_str()).unwrap_or("/"); let mut target = backend.clone(); @@ -194,8 +194,8 @@ fn is_forwardable(name: &HeaderName) -> bool { | "keep-alive" | "proxy-connection" | "transfer-encoding" - | "upgrade" | "te" - | "trailer" | "host" - | "content-length" + | "upgrade" + | "te" | "trailer" + | "host" | "content-length" ) } diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index 20e39ff..34ee5ef 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -182,7 +182,10 @@ pub async fn serve_connection( return; } Err(_) => { - warn!("connection from {} opened no stream within {:?}; closing", remote_addr, auth_timeout); + warn!( + "connection from {} opened no stream within {:?}; closing", + remote_addr, auth_timeout + ); conn.close(0, b"timeout"); return; } @@ -206,7 +209,10 @@ pub async fn serve_connection( return; } let _ = &masq; // used only by the masquerade branch; silence unused warning - tracing::debug!("connection from {} is not TUIC and masquerade is disabled; closing", remote_addr); + tracing::debug!( + "connection from {} is not TUIC and masquerade is disabled; closing", + remote_addr + ); conn.close(0, b""); return; } @@ -364,8 +370,7 @@ pub async fn serve_connection( let conn = connection.clone(); let cb = callback.clone(); tokio::spawn( - spawn_logged("Uni stream", handle_uni_stream(conn, first_uni, cb)) - .instrument(tracing::debug_span!("uni_stream")), + spawn_logged("Uni stream", handle_uni_stream(conn, first_uni, cb)).instrument(tracing::debug_span!("uni_stream")), ); } From 5769adf8133c30e5cb29a1cf8570752d3df74dff Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 07:30:00 +0800 Subject: [PATCH 04/10] refactor(wind-quic): make the h3 adapter generic over the connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The h3::quic adapter previously boxed every stream as `Box` / `Box` purely to unify the first (prefix-replayed) control stream with regular streams — costing a heap allocation + dynamic dispatch per stream and relying on the traits' object-safety. Make `H3Recv` / `H3Send` / `H3Bidi` generic over the connection, using the `C::RecvStream` / `C::SendStream` associated types. Every recv stream is now a `PrefixedRecv` (empty prefix when nothing was peeked), so they share one concrete type without boxing. `server_connection` / `run_masquerade` take `PrefixedRecv` directly instead of `Box`. No behavior change: the masquerade e2e test still passes, and both the quinn and quiche backends compile (the adapter stays backend-agnostic). Co-Authored-By: Claude Opus 4.8 --- crates/wind-quic/src/h3_adapter.rs | 91 +++++++++++++---------- crates/wind-tuic/src/server/masquerade.rs | 4 +- crates/wind-tuic/src/server/mod.rs | 2 +- 3 files changed, 53 insertions(+), 44 deletions(-) diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs index 3412871..c034006 100644 --- a/crates/wind-quic/src/h3_adapter.rs +++ b/crates/wind-quic/src/h3_adapter.rs @@ -11,8 +11,11 @@ //! (control / QPACK) and bidi (request) streams, and opening our own uni //! streams (control / QPACK). The classifier in `wind-tuic` peeks the first //! byte of the peer's control stream; that consumed byte is replayed via -//! [`PrefixedRecv`](crate::PrefixedRecv) and the (boxed) stream is handed to -//! [`server_connection`] as the first stream the adapter yields. +//! [`PrefixedRecv`](crate::PrefixedRecv), which the adapter hands to +//! [`server_connection`] as the first stream it yields. Every recv stream is a +//! `PrefixedRecv` (with an empty prefix when none was consumed), so the adapter +//! is generic over the backend's concrete stream types — no boxing or dynamic +//! dispatch. //! //! The bridge is mechanical: our streams are `AsyncRead`/`AsyncWrite`, while //! `h3::quic` is poll- and `Buf`-based. Recv streams read into a scratch buffer @@ -34,7 +37,7 @@ use h3::quic::{ }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use crate::{QuicConnection, QuicError, QuicRecvStream, QuicSendStream}; +use crate::{PrefixedRecv, QuicConnection, QuicError, QuicRecvStream, QuicSendStream}; /// Scratch buffer size for a single `poll_data` read. const RECV_CHUNK: usize = 16 * 1024; @@ -49,7 +52,7 @@ type BoxBiFut = BoxFut::SendStream, (conn: C, first_control: Box) -> H3Conn { +pub fn server_connection(conn: C, first_control: PrefixedRecv) -> H3Conn { H3Conn { conn, first_recv: Some(H3Recv::new(first_control)), @@ -80,15 +83,16 @@ fn stream_err(e: QuicError) -> StreamErrorIncoming { // Recv stream // --------------------------------------------------------------------------- -/// `h3::quic::RecvStream` over a boxed [`QuicRecvStream`]. -pub struct H3Recv { - inner: Box, +/// `h3::quic::RecvStream` over the backend's recv stream (wrapped in +/// [`PrefixedRecv`] so a peeked control-stream byte can be replayed). +pub struct H3Recv { + inner: PrefixedRecv, id: u64, scratch: Vec, } -impl H3Recv { - fn new(inner: Box) -> Self { +impl H3Recv { + fn new(inner: PrefixedRecv) -> Self { let id = inner.id(); Self { inner, @@ -96,9 +100,14 @@ impl H3Recv { scratch: vec![0u8; RECV_CHUNK], } } + + /// Wrap a freshly-accepted stream that needs no replayed prefix. + fn passthrough(recv: C::RecvStream) -> Self { + Self::new(PrefixedRecv::new(Bytes::new(), recv)) + } } -impl RecvStream for H3Recv { +impl RecvStream for H3Recv { type Buf = Bytes; fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { @@ -131,17 +140,17 @@ impl RecvStream for H3Recv { // Send stream // --------------------------------------------------------------------------- -/// `h3::quic::SendStream` over a boxed [`QuicSendStream`]. -pub struct H3Send { - inner: Box, +/// `h3::quic::SendStream` over the backend's send stream. +pub struct H3Send { + inner: C::SendStream, id: u64, /// At most one `WriteBuf` is buffered at a time; `poll_ready`/`poll_finish` /// drain it through the underlying `AsyncWrite`. pending: Option>, } -impl H3Send { - fn new(inner: Box) -> Self { +impl H3Send { + fn new(inner: C::SendStream) -> Self { let id = inner.id(); Self { inner, @@ -174,7 +183,7 @@ impl H3Send { } } -impl SendStream for H3Send { +impl SendStream for H3Send { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.poll_flush_pending(cx) } @@ -214,12 +223,12 @@ impl SendStream for H3Send { // --------------------------------------------------------------------------- /// `h3::quic::BidiStream` joining an [`H3Send`] and an [`H3Recv`]. -pub struct H3Bidi { - send: H3Send, - recv: H3Recv, +pub struct H3Bidi { + send: H3Send, + recv: H3Recv, } -impl SendStream for H3Bidi { +impl SendStream for H3Bidi { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.send.poll_ready(cx) } @@ -241,7 +250,7 @@ impl SendStream for H3Bidi { } } -impl RecvStream for H3Bidi { +impl RecvStream for H3Bidi { type Buf = Bytes; fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { @@ -257,15 +266,22 @@ impl RecvStream for H3Bidi { } } -impl BidiStream for H3Bidi { - type RecvStream = H3Recv; - type SendStream = H3Send; +impl BidiStream for H3Bidi { + type RecvStream = H3Recv; + type SendStream = H3Send; fn split(self) -> (Self::SendStream, Self::RecvStream) { (self.send, self.recv) } } +fn into_bidi((send, recv): (C::SendStream, C::RecvStream)) -> H3Bidi { + H3Bidi { + send: H3Send::new(send), + recv: H3Recv::passthrough(recv), + } +} + // --------------------------------------------------------------------------- // Opener // --------------------------------------------------------------------------- @@ -279,8 +295,8 @@ pub struct H3Opener { } impl OpenStreams for H3Opener { - type BidiStream = H3Bidi; - type SendStream = H3Send; + type BidiStream = H3Bidi; + type SendStream = H3Send; fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) @@ -302,7 +318,7 @@ impl OpenStreams for H3Opener { /// `h3::quic::Connection` over a [`QuicConnection`] handle. pub struct H3Conn { conn: C, - first_recv: Option, + first_recv: Option>, accept_recv_fut: Option>>, accept_bi_fut: Option>, open_uni_fut: Option>>, @@ -310,8 +326,8 @@ pub struct H3Conn { } impl OpenStreams for H3Conn { - type BidiStream = H3Bidi; - type SendStream = H3Send; + type BidiStream = H3Bidi; + type SendStream = H3Send; fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { poll_open_bidi(&self.conn, &mut self.open_bi_fut, cx) @@ -328,7 +344,7 @@ impl OpenStreams for H3Conn { impl Connection for H3Conn { type OpenStreams = H3Opener; - type RecvStream = H3Recv; + type RecvStream = H3Recv; fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(recv) = self.first_recv.take() { @@ -342,7 +358,7 @@ impl Connection for H3Conn { match poll { Poll::Ready(res) => { self.accept_recv_fut = None; - Poll::Ready(res.map(|r| H3Recv::new(Box::new(r))).map_err(conn_err)) + Poll::Ready(res.map(H3Recv::passthrough).map_err(conn_err)) } Poll::Pending => Poll::Pending, } @@ -390,18 +406,11 @@ fn boxed_accept_bi(conn: C) -> BoxBiFut { Box::pin(async move { conn.accept_bi().await }) } -fn into_bidi((send, recv): (S, R)) -> H3Bidi { - H3Bidi { - send: H3Send::new(Box::new(send)), - recv: H3Recv::new(Box::new(recv)), - } -} - fn poll_open_send( conn: &C, slot: &mut Option>>, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll, StreamErrorIncoming>> { let poll = { let fut = slot.get_or_insert_with(|| { let conn = conn.clone(); @@ -412,7 +421,7 @@ fn poll_open_send( match poll { Poll::Ready(res) => { *slot = None; - Poll::Ready(res.map(|s| H3Send::new(Box::new(s))).map_err(stream_err)) + Poll::Ready(res.map(H3Send::new).map_err(stream_err)) } Poll::Pending => Poll::Pending, } @@ -422,7 +431,7 @@ fn poll_open_bidi( conn: &C, slot: &mut Option>, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll, StreamErrorIncoming>> { let poll = { let fut = slot.get_or_insert_with(|| { let conn = conn.clone(); diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index 4293121..e736889 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -17,7 +17,7 @@ use tokio_stream::StreamExt as _; use tokio_util::sync::CancellationToken; use tracing::debug; use wind_quic::{ - QuicConnection, QuicRecvStream, + PrefixedRecv, QuicConnection, h3_adapter::{self, H3Conn}, }; @@ -50,7 +50,7 @@ fn client() -> Client { /// disconnects or `cancel` fires. pub async fn run_masquerade( conn: C, - first_control: Box, + first_control: PrefixedRecv, cfg: &MasqueradeConfig, cancel: CancellationToken, ) -> eyre::Result<()> { diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index 34ee5ef..5c1498f 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -203,7 +203,7 @@ pub async fn serve_connection( if let Some(cfg) = masq.as_ref() { let prefixed = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&first_byte), first_uni); info!("connection from {} is not TUIC; serving HTTP/3 masquerade", remote_addr); - if let Err(e) = masquerade::run_masquerade(conn, Box::new(prefixed), cfg, cancel).await { + if let Err(e) = masquerade::run_masquerade(conn, prefixed, cfg, cancel).await { tracing::debug!("masquerade for {} ended: {e:?}", remote_addr); } return; From 07dbc3a4946bc0d4d70094abfcb8f13835730261 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 09:46:14 +0800 Subject: [PATCH 05/10] fix(wind-tuic): classify on the first event of any kind, not just a uni stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The TUIC `Auth` command is not guaranteed to be the first thing the server observes — a `Connect` bidi stream, a heartbeat datagram, or plain QUIC reordering can arrive first. The classifier previously only `accept_uni()`'d and assumed the first uni stream was the discriminator, so a connection whose first event was a bidi/datagram could stall or misclassify (and for h3, the request bidi could be consumed and lost). Mirrors Itsusinn/tuic's classifier. - Race `accept_uni()` + `accept_bi()` + `read_datagram()` for the first event. - Classify on the first **two** bytes: `[VER, CmdType]` with `CmdType <= Heartbeat`, so an h3 stream that happens to start with `VER` (e.g. a stray PUSH_PROMISE frame type) isn't misread as TUIC. - Prefetch the consumed first event into the right handler so nothing is lost: TUIC uni/bi/datagram handlers, or the h3 adapter (which now also accepts a prefetched first **bidi** request stream, not just a uni control stream). - On classification timeout, fall back to the masquerade (a silent prober still sees a web server) instead of closing. Verified: masquerade e2e test, quinn + quiche TUIC relay tests, clippy, fmt. Co-Authored-By: Claude Opus 4.8 --- crates/wind-quic/src/h3_adapter.rs | 41 +++-- crates/wind-tuic/src/server/masquerade.rs | 15 +- crates/wind-tuic/src/server/mod.rs | 180 ++++++++++++++++------ 3 files changed, 167 insertions(+), 69 deletions(-) diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs index c034006..53281eb 100644 --- a/crates/wind-quic/src/h3_adapter.rs +++ b/crates/wind-quic/src/h3_adapter.rs @@ -9,13 +9,14 @@ //! //! Only the **server** surface is implemented: accepting peer-initiated uni //! (control / QPACK) and bidi (request) streams, and opening our own uni -//! streams (control / QPACK). The classifier in `wind-tuic` peeks the first -//! byte of the peer's control stream; that consumed byte is replayed via -//! [`PrefixedRecv`](crate::PrefixedRecv), which the adapter hands to -//! [`server_connection`] as the first stream it yields. Every recv stream is a -//! `PrefixedRecv` (with an empty prefix when none was consumed), so the adapter -//! is generic over the backend's concrete stream types — no boxing or dynamic -//! dispatch. +//! streams (control / QPACK). The classifier in `wind-tuic` races the peer's +//! first uni/bi stream and peeks a couple of bytes to decide TUIC vs HTTP/3; +//! whichever stream it consumed for an h3 connection is handed back (with the +//! peeked bytes replayed via [`PrefixedRecv`](crate::PrefixedRecv)) as +//! `first_uni` / `first_bidi`, so the adapter yields it before accepting +//! anything new. Every recv stream is a `PrefixedRecv` (with an empty prefix +//! when none was consumed), so the adapter is generic over the backend's +//! concrete stream types — no boxing or dynamic dispatch. //! //! The bridge is mechanical: our streams are `AsyncRead`/`AsyncWrite`, while //! `h3::quic` is poll- and `Buf`-based. Recv streams read into a scratch buffer @@ -49,13 +50,25 @@ type BoxFut = Pin + Send>>; /// `type_complexity` lint when written inline in every slot/signature. type BoxBiFut = BoxFut::SendStream, ::RecvStream), QuicError>>; -/// Build an HTTP/3 server connection over `conn`, yielding `first_control` (the -/// peer's control stream, with its peeked byte already replayed) as the first -/// accepted recv stream. -pub fn server_connection(conn: C, first_control: PrefixedRecv) -> H3Conn { +/// Build an HTTP/3 server connection over `conn`. +/// +/// The classifier in `wind-tuic` races the connection's first uni/bi stream to +/// decide TUIC vs HTTP/3, consuming a couple of bytes to peek. Whichever stream +/// it consumed for an h3 connection is handed back here as `first_uni` and/or +/// `first_bidi` (with the peeked bytes replayed via [`PrefixedRecv`]), so the +/// h3 server yields it before accepting anything new — nothing is lost. +pub fn server_connection( + conn: C, + first_uni: Option>, + first_bidi: Option<(C::SendStream, PrefixedRecv)>, +) -> H3Conn { H3Conn { conn, - first_recv: Some(H3Recv::new(first_control)), + first_recv: first_uni.map(H3Recv::new), + first_bidi: first_bidi.map(|(send, recv)| H3Bidi { + send: H3Send::new(send), + recv: H3Recv::new(recv), + }), accept_recv_fut: None, accept_bi_fut: None, open_uni_fut: None, @@ -319,6 +332,7 @@ impl OpenStreams for H3Opener { pub struct H3Conn { conn: C, first_recv: Option>, + first_bidi: Option>, accept_recv_fut: Option>>, accept_bi_fut: Option>, open_uni_fut: Option>>, @@ -365,6 +379,9 @@ impl Connection for H3Conn { } fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(bidi) = self.first_bidi.take() { + return Poll::Ready(Ok(bidi)); + } let poll = { let conn = &self.conn; let fut = self.accept_bi_fut.get_or_insert_with(|| boxed_accept_bi(conn.clone())); diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index e736889..a87da8a 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -1,8 +1,9 @@ //! HTTP/3 masquerade: serve non-TUIC (real HTTP/3) clients as a reverse proxy. //! //! When the connection classifier in [`super`] decides a peer is speaking -//! actual HTTP/3 rather than TUIC (its first stream byte isn't the TUIC version -//! `0x05`), it hands the connection here. We run a real HTTP/3 server over the +//! actual HTTP/3 rather than TUIC (its first stream's leading bytes aren't +//! valid TUIC framing), it hands the connection here. We run a real HTTP/3 +//! server over the //! backend-agnostic [`wind_quic::h3_adapter`] and reverse-proxy every request //! to a configured upstream site with a `reqwest` client, relaying the response //! back. To an active prober the server is indistinguishable from a normal @@ -45,19 +46,21 @@ fn client() -> Client { .clone() } -/// Run the HTTP/3 masquerade server over `conn`. `first_control` is the peer's -/// first stream (its peeked byte already replayed). Returns when the peer +/// Run the HTTP/3 masquerade server over `conn`. `first_uni` / `first_bidi` are +/// the stream the classifier already consumed to peek (with its bytes +/// replayed), if any, so the h3 server doesn't lose it. Returns when the peer /// disconnects or `cancel` fires. pub async fn run_masquerade( conn: C, - first_control: PrefixedRecv, + first_uni: Option>, + first_bidi: Option<(C::SendStream, PrefixedRecv)>, cfg: &MasqueradeConfig, cancel: CancellationToken, ) -> eyre::Result<()> { let backend = Url::parse(&cfg.upstream).map_err(|e| eyre::eyre!("invalid masquerade upstream {:?}: {e}", cfg.upstream))?; let client = client(); - let adapter = h3_adapter::server_connection(conn, first_control); + let adapter = h3_adapter::server_connection(conn, first_uni, first_bidi); let mut h3conn = h3::server::Connection::new(adapter) .await .map_err(|e| eyre::eyre!("h3 server setup failed: {e}"))?; diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index 5c1498f..7197eb5 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -147,6 +147,66 @@ impl Clone for UdpSession { /// would let one authenticated peer pin a large amount of background work. const MAX_UDP_SESSIONS_PER_CONN: u64 = 1024; +/// The first thing a freshly-handshaked peer sends, raced across all three +/// transports to classify the connection — the TUIC `Auth` is **not** +/// guaranteed to arrive first (a `Connect` bidi, a heartbeat datagram, or QUIC +/// reordering can beat it), so we cannot assume the first event is a uni `Auth` +/// stream. +enum FirstEvent { + Uni(C::RecvStream), + Bi(C::SendStream, C::RecvStream), + Datagram(bytes::Bytes), +} + +/// A first event classified as TUIC, with the peeked header bytes replayed for +/// the stream variants, handed to the matching handler. +enum TuicFirst { + Uni(wind_quic::PrefixedRecv), + Bi(C::SendStream, wind_quic::PrefixedRecv), + Datagram(bytes::Bytes), +} + +/// Whether a 2-byte prefix is TUIC framing: `[VER, CmdType]` with `CmdType` in +/// `Auth..=Heartbeat` (0..=4). An HTTP/3 stream-type / frame-type byte won't +/// satisfy both, so this distinguishes the two even when an h3 stream happens +/// to start with `VER`. +fn is_tuic_prefix(prefix: [u8; 2]) -> bool { + prefix[0] == crate::proto::VER && prefix[1] <= u8::from(CmdType::Heartbeat) +} + +/// Read the 2-byte classifier prefix from a stream (`None` if it closes first). +async fn read_prefix(recv: &mut R) -> Option<[u8; 2]> { + let mut prefix = [0u8; 2]; + recv.read_exact(&mut prefix).await.ok().map(|_| prefix) +} + +/// Hand a non-TUIC connection to the HTTP/3 masquerade (when enabled), passing +/// any first stream the classifier already consumed so the h3 server can replay +/// it. Closes the connection when the masquerade is disabled or compiled out. +async fn dispatch_masquerade( + conn: C, + masq: Option<&MasqueradeConfig>, + first_uni: Option>, + first_bidi: Option<(C::SendStream, wind_quic::PrefixedRecv)>, + remote_addr: SocketAddr, + cancel: CancellationToken, +) { + #[cfg(feature = "masquerade")] + if let Some(cfg) = masq { + info!("connection from {} is not TUIC; serving HTTP/3 masquerade", remote_addr); + if let Err(e) = masquerade::run_masquerade(conn, first_uni, first_bidi, cfg, cancel).await { + tracing::debug!("masquerade for {} ended: {e:?}", remote_addr); + } + return; + } + let _ = (masq, first_uni, first_bidi, cancel); + tracing::debug!( + "connection from {} is not TUIC and masquerade is disabled; closing", + remote_addr + ); + conn.close(0, b""); +} + /// Drive an established TUIC connection: spawn the auth-timeout guard and the /// datagram/uni/bi accept loops, then run until the peer disconnects or /// `cancel` fires. Backend-agnostic — both backends call this after their @@ -165,61 +225,63 @@ pub async fn serve_connection( { // --- Classify the connection: real TUIC vs HTTP/3 masquerade --- // - // Both negotiate the `h3` ALPN, so the discriminator is the first byte of the - // first uni stream: every TUIC stream begins with the version byte `0x05` - // (`proto::VER`); a real HTTP/3 client's first uni stream is its control - // stream, beginning with the stream-type varint `0x00`. Peeking it here (and - // replaying it via `PrefixedRecv`) keeps both the TUIC parser and the h3 - // adapter able to read the stream from byte 0. - let first_uni = tokio::select! { + // Both negotiate the `h3` ALPN, so we inspect the first thing the peer sends. + // The first event may be a uni stream, a bidi stream, or a datagram (the TUIC + // `Auth` is not guaranteed to arrive first), so race all three. We then peek + // the first two bytes: TUIC framing is `[VER, CmdType]` (see `is_tuic_prefix`), + // while an HTTP/3 client's streams begin with an h3 stream-type / frame-type + // byte. The peeked bytes are replayed via `PrefixedRecv` so neither the TUIC + // parser nor the h3 adapter loses any data. + let first = tokio::select! { _ = cancel.cancelled() => return, - r = tokio::time::timeout(auth_timeout, conn.accept_uni()) => r, + r = tokio::time::timeout(auth_timeout, async { + tokio::select! { + res = conn.accept_uni() => res.map(FirstEvent::::Uni), + res = conn.accept_bi() => res.map(|(s, r)| FirstEvent::::Bi(s, r)), + res = conn.read_datagram() => res.map(FirstEvent::::Datagram), + } + }) => r, }; - let mut first_uni = match first_uni { - Ok(Ok(s)) => s, + let first = match first { + Ok(Ok(ev)) => ev, Ok(Err(e)) => { - tracing::debug!("connection from {} closed before first stream: {e:?}", remote_addr); + tracing::debug!("connection from {} closed before first event: {e:?}", remote_addr); return; } Err(_) => { - warn!( - "connection from {} opened no stream within {:?}; closing", - remote_addr, auth_timeout - ); - conn.close(0, b"timeout"); - return; + // Nothing within the window: classification is impossible, so fall back + // to the masquerade (a silent prober still sees a web server) or close. + return dispatch_masquerade(conn, masq.as_ref(), None, None, remote_addr, cancel).await; } }; - let mut first_byte = [0u8; 1]; - if let Err(e) = first_uni.read_exact(&mut first_byte).await { - tracing::debug!("connection from {} closed reading first byte: {e}", remote_addr); - return; - } - - if first_byte[0] != crate::proto::VER { - // Not TUIC → HTTP/3 masquerade (when enabled), otherwise drop. - #[cfg(feature = "masquerade")] - if let Some(cfg) = masq.as_ref() { - let prefixed = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&first_byte), first_uni); - info!("connection from {} is not TUIC; serving HTTP/3 masquerade", remote_addr); - if let Err(e) = masquerade::run_masquerade(conn, prefixed, cfg, cancel).await { - tracing::debug!("masquerade for {} ended: {e:?}", remote_addr); + let tuic_first: TuicFirst = match first { + FirstEvent::Uni(mut recv) => { + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + TuicFirst::Uni(recv) + } else { + return dispatch_masquerade(conn, masq.as_ref(), Some(recv), None, remote_addr, cancel).await; } - return; } - let _ = &masq; // used only by the masquerade branch; silence unused warning - tracing::debug!( - "connection from {} is not TUIC and masquerade is disabled; closing", - remote_addr - ); - conn.close(0, b""); - return; - } - - // TUIC: re-wrap the first uni stream so the peeked version byte is replayed to - // the Auth parser. - let first_uni = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&first_byte), first_uni); + FirstEvent::Bi(send, mut recv) => { + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + TuicFirst::Bi(send, recv) + } else { + return dispatch_masquerade(conn, masq.as_ref(), None, Some((send, recv)), remote_addr, cancel).await; + } + } + FirstEvent::Datagram(dg) => { + if dg.len() >= 2 && is_tuic_prefix([dg[0], dg[1]]) { + TuicFirst::Datagram(dg) + } else { + return dispatch_masquerade(conn, masq.as_ref(), None, None, remote_addr, cancel).await; + } + } + }; let udp_root_cancel = cancel.child_token(); @@ -363,15 +425,31 @@ pub async fn serve_connection( ); } - // Process the first uni stream already accepted during classification — for - // TUIC this is the Auth stream (with its peeked version byte replayed). - // Subsequent uni streams are handled by the acceptor loop above. + // Process the first event already consumed during classification (with any + // peeked header bytes replayed). Subsequent events are handled by the acceptor + // loops above. { let conn = connection.clone(); let cb = callback.clone(); - tokio::spawn( - spawn_logged("Uni stream", handle_uni_stream(conn, first_uni, cb)).instrument(tracing::debug_span!("uni_stream")), - ); + match tuic_first { + TuicFirst::Uni(recv) => { + tokio::spawn( + spawn_logged("Uni stream", handle_uni_stream(conn, recv, cb)) + .instrument(tracing::debug_span!("uni_stream")), + ); + } + TuicFirst::Bi(send, recv) => { + tokio::spawn( + spawn_logged("Bi stream", handle_bi_stream(conn, send, recv, cb)) + .instrument(tracing::debug_span!("bi_stream")), + ); + } + TuicFirst::Datagram(dg) => { + tokio::spawn( + spawn_logged("Datagram", handle_datagram(conn, dg, cb)).instrument(tracing::debug_span!("datagram")), + ); + } + } } // Exit on either server shutdown or peer disconnect. @@ -483,7 +561,7 @@ async fn handle_uni_stream( async fn handle_bi_stream( connection: Arc>, send: C::SendStream, - mut recv: C::RecvStream, + mut recv: impl AsyncRead + Unpin + Send + 'static, callback: CB, ) -> eyre::Result<()> { if !ensure_authed(&connection).await { From 02ec07561944ebe01753dddd7a221fd28c3ec381 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 09:53:56 +0800 Subject: [PATCH 06/10] refactor(wind-quic): impl h3::quic::RecvStream directly on PrefixedRecv Following Itsusinn/tuic (whose peekable recv stream *is* its h3 RecvStream), implement `h3::quic::RecvStream` for `PrefixedRecv` itself instead of wrapping it in a separate `H3Recv` type. `PrefixedRecv` already owns the replayed prefix and the backend stream and exposes `id()`/`stop()`, so the wrapper added nothing but indirection. Fresh accepted streams use an empty-prefix `PrefixedRecv`. This deletes the `H3Recv` struct and its `new`/`passthrough` constructors; the adapter's `RecvStream` associated types are now `PrefixedRecv` directly. The send side keeps `H3Send` (the orphan rule forbids implementing the foreign `h3::quic::SendStream` on the backend's `C::SendStream`). No behavior change: masquerade e2e test, clippy, fmt, and the quiche backend all pass. Co-Authored-By: Claude Opus 4.8 --- crates/wind-quic/src/h3_adapter.rs | 59 ++++++++++-------------------- 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs index 53281eb..0febba7 100644 --- a/crates/wind-quic/src/h3_adapter.rs +++ b/crates/wind-quic/src/h3_adapter.rs @@ -64,10 +64,10 @@ pub fn server_connection( ) -> H3Conn { H3Conn { conn, - first_recv: first_uni.map(H3Recv::new), + first_recv: first_uni, first_bidi: first_bidi.map(|(send, recv)| H3Bidi { send: H3Send::new(send), - recv: H3Recv::new(recv), + recv, }), accept_recv_fut: None, accept_bi_fut: None, @@ -93,39 +93,20 @@ fn stream_err(e: QuicError) -> StreamErrorIncoming { } // --------------------------------------------------------------------------- -// Recv stream +// Recv stream — `PrefixedRecv` *is* the adapter's recv stream // --------------------------------------------------------------------------- -/// `h3::quic::RecvStream` over the backend's recv stream (wrapped in -/// [`PrefixedRecv`] so a peeked control-stream byte can be replayed). -pub struct H3Recv { - inner: PrefixedRecv, - id: u64, - scratch: Vec, -} - -impl H3Recv { - fn new(inner: PrefixedRecv) -> Self { - let id = inner.id(); - Self { - inner, - id, - scratch: vec![0u8; RECV_CHUNK], - } - } - - /// Wrap a freshly-accepted stream that needs no replayed prefix. - fn passthrough(recv: C::RecvStream) -> Self { - Self::new(PrefixedRecv::new(Bytes::new(), recv)) - } -} - -impl RecvStream for H3Recv { +/// [`PrefixedRecv`] doubles as the adapter's `h3::quic::RecvStream`: it already +/// owns the (possibly empty) replayed prefix plus the backend recv stream, so +/// no separate wrapper type is needed. Fresh accepted streams are wrapped with +/// an empty prefix. +impl RecvStream for PrefixedRecv { type Buf = Bytes; fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, StreamErrorIncoming>> { - let mut rb = ReadBuf::new(&mut self.scratch); - match Pin::new(&mut self.inner).poll_read(cx, &mut rb) { + let mut scratch = [0u8; RECV_CHUNK]; + let mut rb = ReadBuf::new(&mut scratch); + match Pin::new(&mut *self).poll_read(cx, &mut rb) { Poll::Ready(Ok(())) => { let filled = rb.filled(); if filled.is_empty() { @@ -141,11 +122,11 @@ impl RecvStream for H3Recv { } fn stop_sending(&mut self, error_code: u64) { - self.inner.stop(error_code); + self.stop(error_code); } fn recv_id(&self) -> StreamId { - stream_id(self.id) + stream_id(self.id()) } } @@ -235,10 +216,10 @@ impl SendStream for H3Send { // Bidi stream (request streams) // --------------------------------------------------------------------------- -/// `h3::quic::BidiStream` joining an [`H3Send`] and an [`H3Recv`]. +/// `h3::quic::BidiStream` joining an [`H3Send`] and a [`PrefixedRecv`]. pub struct H3Bidi { send: H3Send, - recv: H3Recv, + recv: PrefixedRecv, } impl SendStream for H3Bidi { @@ -280,7 +261,7 @@ impl RecvStream for H3Bidi { } impl BidiStream for H3Bidi { - type RecvStream = H3Recv; + type RecvStream = PrefixedRecv; type SendStream = H3Send; fn split(self) -> (Self::SendStream, Self::RecvStream) { @@ -291,7 +272,7 @@ impl BidiStream for H3Bidi { fn into_bidi((send, recv): (C::SendStream, C::RecvStream)) -> H3Bidi { H3Bidi { send: H3Send::new(send), - recv: H3Recv::passthrough(recv), + recv: PrefixedRecv::new(Bytes::new(), recv), } } @@ -331,7 +312,7 @@ impl OpenStreams for H3Opener { /// `h3::quic::Connection` over a [`QuicConnection`] handle. pub struct H3Conn { conn: C, - first_recv: Option>, + first_recv: Option>, first_bidi: Option>, accept_recv_fut: Option>>, accept_bi_fut: Option>, @@ -358,7 +339,7 @@ impl OpenStreams for H3Conn { impl Connection for H3Conn { type OpenStreams = H3Opener; - type RecvStream = H3Recv; + type RecvStream = PrefixedRecv; fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(recv) = self.first_recv.take() { @@ -372,7 +353,7 @@ impl Connection for H3Conn { match poll { Poll::Ready(res) => { self.accept_recv_fut = None; - Poll::Ready(res.map(H3Recv::passthrough).map_err(conn_err)) + Poll::Ready(res.map(|r| PrefixedRecv::new(Bytes::new(), r)).map_err(conn_err)) } Poll::Pending => Poll::Pending, } From 2e952f72deef8801b5397733640c50fc88662e88 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 10:44:27 +0800 Subject: [PATCH 07/10] refactor(wind-tuic): per-stream classification, remove the "first event" logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Classification was connection-level + prefetch: race the first uni/bi/datagram, peek 2 bytes, decide TUIC vs h3 once, and hand that first stream forward as a one-off (`FirstEvent`/`TuicFirst`) or as the h3 adapter's prefetched `first_recv`/`first_bidi`. That left two code paths — the special "first" stream and the acceptor-loop "rest." Replace it with uniform per-stream classification: every accepted stream reads its own 2-byte prefix and routes itself — TUIC streams to the existing `handle_uni_stream`/`handle_bi_stream`/`handle_datagram`, non-TUIC streams to the h3 masquerade. One path, no "first" special case. - wind-quic h3 adapter is now **channel-fed**: `H3Conn` pulls accepted streams from two `mpsc` receivers (`server_connection(conn, recv_rx, bidi_rx)`); `poll_accept_recv`/`poll_accept_bidi` just `poll_recv`. Dropped `first_recv`, `first_bidi`, the boxed accept-futures, and `boxed_accept_uni/bi`. `conn` is kept only for opening the server's own control/QPACK streams. - The per-stream router in `serve_connection` reads each stream's prefix, wraps it in `PrefixedRecv`, and either runs the TUIC handler or sends it to the h3 channels. Removed `FirstEvent`, `TuicFirst`, the classification race, and `dispatch_masquerade`. - Lazy h3 start: `run_masquerade` is spawned parked and waits on a `Notify` the router fires on the first non-TUIC stream before building the h3 server (which opens the control stream) — so a pure-TUIC connection never opens h3 streams. The auth-timeout guard skips closing once a stream has classified as h3. Trade-off (documented): a connection no longer commits to a mode up front — each stream routes independently, so one connection could carry both TUIC and h3 streams. Real clients never do; a mixer isn't exploitable (fixed-upstream proxy). Verified: masquerade e2e (reqwest HTTP/3), quinn + quiche TUIC relay tests, masquerade-off build, clippy, nightly fmt. Co-Authored-By: Claude Opus 4.8 --- crates/wind-quic/src/h3_adapter.rs | 114 ++++------ crates/wind-tuic/src/server/masquerade.rs | 28 ++- crates/wind-tuic/src/server/mod.rs | 258 +++++++++++----------- 3 files changed, 192 insertions(+), 208 deletions(-) diff --git a/crates/wind-quic/src/h3_adapter.rs b/crates/wind-quic/src/h3_adapter.rs index 0febba7..79d21d1 100644 --- a/crates/wind-quic/src/h3_adapter.rs +++ b/crates/wind-quic/src/h3_adapter.rs @@ -9,13 +9,13 @@ //! //! Only the **server** surface is implemented: accepting peer-initiated uni //! (control / QPACK) and bidi (request) streams, and opening our own uni -//! streams (control / QPACK). The classifier in `wind-tuic` races the peer's -//! first uni/bi stream and peeks a couple of bytes to decide TUIC vs HTTP/3; -//! whichever stream it consumed for an h3 connection is handed back (with the -//! peeked bytes replayed via [`PrefixedRecv`](crate::PrefixedRecv)) as -//! `first_uni` / `first_bidi`, so the adapter yields it before accepting -//! anything new. Every recv stream is a `PrefixedRecv` (with an empty prefix -//! when none was consumed), so the adapter is generic over the backend's +//! streams (control / QPACK). The per-stream classifier in `wind-tuic` reads +//! each accepted stream's prefix, and feeds the ones it classified as h3 into +//! this adapter over two channels (`recv_rx` / `bidi_rx`) — already accepted +//! off the connection, with their peeked prefix replayed via +//! [`PrefixedRecv`](crate::PrefixedRecv). So the adapter just pulls streams +//! from the channels; there is no "first stream" special case. Every recv +//! stream is a `PrefixedRecv`, so the adapter is generic over the backend's //! concrete stream types — no boxing or dynamic dispatch. //! //! The bridge is mechanical: our streams are `AsyncRead`/`AsyncWrite`, while @@ -36,7 +36,10 @@ use h3::quic::{ BidiStream, Connection, ConnectionErrorIncoming, OpenStreams, RecvStream, SendStream, StreamErrorIncoming, StreamId, WriteBuf, }; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + sync::mpsc, +}; use crate::{PrefixedRecv, QuicConnection, QuicError, QuicRecvStream, QuicSendStream}; @@ -45,32 +48,34 @@ const RECV_CHUNK: usize = 16 * 1024; type BoxFut = Pin + Send>>; -/// A boxed in-flight `accept_bi` / `open_bi` future. Aliased because the bidi -/// case returns a `(SendStream, RecvStream)` tuple, which trips clippy's -/// `type_complexity` lint when written inline in every slot/signature. +/// A boxed in-flight `open_bi` future. Aliased because the bidi case returns a +/// `(SendStream, RecvStream)` tuple, which trips clippy's `type_complexity` +/// lint when written inline in every slot/signature. type BoxBiFut = BoxFut::SendStream, ::RecvStream), QuicError>>; +/// Channel of accepted (prefix-replayed) recv streams the `wind-tuic` +/// per-stream router feeds to the masquerade h3 server. +type RecvRx = mpsc::UnboundedReceiver::RecvStream>>; +/// Channel of accepted bidi (request) streams — `(send half, prefix-replayed +/// recv)`. +type BidiRx = mpsc::UnboundedReceiver<( + ::SendStream, + PrefixedRecv<::RecvStream>, +)>; + /// Build an HTTP/3 server connection over `conn`. /// -/// The classifier in `wind-tuic` races the connection's first uni/bi stream to -/// decide TUIC vs HTTP/3, consuming a couple of bytes to peek. Whichever stream -/// it consumed for an h3 connection is handed back here as `first_uni` and/or -/// `first_bidi` (with the peeked bytes replayed via [`PrefixedRecv`]), so the -/// h3 server yields it before accepting anything new — nothing is lost. -pub fn server_connection( - conn: C, - first_uni: Option>, - first_bidi: Option<(C::SendStream, PrefixedRecv)>, -) -> H3Conn { +/// Accepted streams are pulled from `recv_rx` / `bidi_rx`, which the +/// `wind-tuic` per-stream router fills with the streams it classified as h3 +/// (already accepted off the connection, with their peeked prefix replayed via +/// [`PrefixedRecv`]). `conn` is kept only for *opening* the server's own +/// control / QPACK streams. There is no "first stream" special case — every +/// accepted stream arrives the same way. +pub fn server_connection(conn: C, recv_rx: RecvRx, bidi_rx: BidiRx) -> H3Conn { H3Conn { conn, - first_recv: first_uni, - first_bidi: first_bidi.map(|(send, recv)| H3Bidi { - send: H3Send::new(send), - recv, - }), - accept_recv_fut: None, - accept_bi_fut: None, + recv_rx, + bidi_rx, open_uni_fut: None, open_bi_fut: None, } @@ -309,13 +314,12 @@ impl OpenStreams for H3Opener { // Connection // --------------------------------------------------------------------------- -/// `h3::quic::Connection` over a [`QuicConnection`] handle. +/// `h3::quic::Connection` over a [`QuicConnection`] handle. Accepts streams +/// from the router's channels; opens streams directly on `conn`. pub struct H3Conn { conn: C, - first_recv: Option>, - first_bidi: Option>, - accept_recv_fut: Option>>, - accept_bi_fut: Option>, + recv_rx: RecvRx, + bidi_rx: BidiRx, open_uni_fut: Option>>, open_bi_fut: Option>, } @@ -342,37 +346,21 @@ impl Connection for H3Conn { type RecvStream = PrefixedRecv; fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - if let Some(recv) = self.first_recv.take() { - return Poll::Ready(Ok(recv)); - } - let poll = { - let conn = &self.conn; - let fut = self.accept_recv_fut.get_or_insert_with(|| boxed_accept_uni(conn.clone())); - fut.as_mut().poll(cx) - }; - match poll { - Poll::Ready(res) => { - self.accept_recv_fut = None; - Poll::Ready(res.map(|r| PrefixedRecv::new(Bytes::new(), r)).map_err(conn_err)) - } + match self.recv_rx.poll_recv(cx) { + Poll::Ready(Some(recv)) => Poll::Ready(Ok(recv)), + // Channel closed → the router (and the connection) is gone. + Poll::Ready(None) => Poll::Ready(Err(ConnectionErrorIncoming::Timeout)), Poll::Pending => Poll::Pending, } } fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { - if let Some(bidi) = self.first_bidi.take() { - return Poll::Ready(Ok(bidi)); - } - let poll = { - let conn = &self.conn; - let fut = self.accept_bi_fut.get_or_insert_with(|| boxed_accept_bi(conn.clone())); - fut.as_mut().poll(cx) - }; - match poll { - Poll::Ready(res) => { - self.accept_bi_fut = None; - Poll::Ready(res.map(into_bidi).map_err(conn_err)) - } + match self.bidi_rx.poll_recv(cx) { + Poll::Ready(Some((send, recv))) => Poll::Ready(Ok(H3Bidi { + send: H3Send::new(send), + recv, + })), + Poll::Ready(None) => Poll::Ready(Err(ConnectionErrorIncoming::Timeout)), Poll::Pending => Poll::Pending, } } @@ -396,14 +384,6 @@ fn stream_id(id: u64) -> StreamId { StreamId::try_from(id).unwrap_or_else(|_| StreamId::try_from(0).expect("0 is a valid stream id")) } -fn boxed_accept_uni(conn: C) -> BoxFut> { - Box::pin(async move { conn.accept_uni().await }) -} - -fn boxed_accept_bi(conn: C) -> BoxBiFut { - Box::pin(async move { conn.accept_bi().await }) -} - fn poll_open_send( conn: &C, slot: &mut Option>>, diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index a87da8a..71c87de 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -9,11 +9,15 @@ //! back. To an active prober the server is indistinguishable from a normal //! HTTP/3 web server. -use std::{sync::OnceLock, time::Duration}; +use std::{ + sync::{Arc, OnceLock}, + time::Duration, +}; use bytes::{Buf as _, Bytes}; use http::header::HeaderName; use reqwest::{Client, Url}; +use tokio::sync::{Notify, mpsc}; use tokio_stream::StreamExt as _; use tokio_util::sync::CancellationToken; use tracing::debug; @@ -46,21 +50,29 @@ fn client() -> Client { .clone() } -/// Run the HTTP/3 masquerade server over `conn`. `first_uni` / `first_bidi` are -/// the stream the classifier already consumed to peek (with its bytes -/// replayed), if any, so the h3 server doesn't lose it. Returns when the peer +/// Run the HTTP/3 masquerade server over `conn`. The per-stream router in +/// [`super`] feeds h3 streams to `recv_rx` / `bidi_rx` and fires `go` on the +/// first one. We wait for `go` before building the h3 server (whose setup opens +/// our control stream + SETTINGS), so a pure-TUIC connection — which never +/// fires `go` — never has h3 streams opened on it. Returns when the peer /// disconnects or `cancel` fires. pub async fn run_masquerade( conn: C, - first_uni: Option>, - first_bidi: Option<(C::SendStream, PrefixedRecv)>, - cfg: &MasqueradeConfig, + recv_rx: mpsc::UnboundedReceiver>, + bidi_rx: mpsc::UnboundedReceiver<(C::SendStream, PrefixedRecv)>, + go: Arc, + cfg: MasqueradeConfig, cancel: CancellationToken, ) -> eyre::Result<()> { + tokio::select! { + _ = cancel.cancelled() => return Ok(()), + _ = go.notified() => {} + } + let backend = Url::parse(&cfg.upstream).map_err(|e| eyre::eyre!("invalid masquerade upstream {:?}: {e}", cfg.upstream))?; let client = client(); - let adapter = h3_adapter::server_connection(conn, first_uni, first_bidi); + let adapter = h3_adapter::server_connection(conn, recv_rx, bidi_rx); let mut h3conn = h3::server::Connection::new(adapter) .await .map_err(|e| eyre::eyre!("h3 server setup failed: {e}"))?; diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index 7197eb5..5cc9f0f 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -11,7 +11,15 @@ //! (the reference behavior) and the replacement for the bespoke `wind-tuiche` //! driver. -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, +}; use arc_swap::ArcSwapOption; use eyre::Context as _; @@ -147,23 +155,21 @@ impl Clone for UdpSession { /// would let one authenticated peer pin a large amount of background work. const MAX_UDP_SESSIONS_PER_CONN: u64 = 1024; -/// The first thing a freshly-handshaked peer sends, raced across all three -/// transports to classify the connection — the TUIC `Auth` is **not** -/// guaranteed to arrive first (a `Connect` bidi, a heartbeat datagram, or QUIC -/// reordering can beat it), so we cannot assume the first event is a uni `Auth` -/// stream. -enum FirstEvent { - Uni(C::RecvStream), - Bi(C::SendStream, C::RecvStream), - Datagram(bytes::Bytes), +/// Per-connection senders for the lazily-started HTTP/3 masquerade. The +/// per-stream router pushes streams it classified as h3 here; `run_masquerade` +/// (spawned parked) pulls them after `go` fires on the first one. `None` +/// everywhere when the masquerade is disabled or not compiled in. +#[cfg_attr(not(feature = "masquerade"), allow(dead_code))] +struct H3Senders { + uni_tx: mpsc::UnboundedSender>, + bidi_tx: mpsc::UnboundedSender<(C::SendStream, wind_quic::PrefixedRecv)>, + go: Arc, } -/// A first event classified as TUIC, with the peeked header bytes replayed for -/// the stream variants, handed to the matching handler. -enum TuicFirst { +/// A non-TUIC stream handed to the h3 masquerade by the per-stream router. +enum H3Stream { Uni(wind_quic::PrefixedRecv), Bi(C::SendStream, wind_quic::PrefixedRecv), - Datagram(bytes::Bytes), } /// Whether a 2-byte prefix is TUIC framing: `[VER, CmdType]` with `CmdType` in @@ -180,31 +186,58 @@ async fn read_prefix(recv: &mut R) -> Option<[u8; 2]> { recv.read_exact(&mut prefix).await.ok().map(|_| prefix) } -/// Hand a non-TUIC connection to the HTTP/3 masquerade (when enabled), passing -/// any first stream the classifier already consumed so the h3 server can replay -/// it. Closes the connection when the masquerade is disabled or compiled out. -async fn dispatch_masquerade( +/// Build this connection's HTTP/3 masquerade router: two channels feeding a +/// **parked** `run_masquerade` task (it only builds the h3 server once the +/// router wakes it on the first h3 stream). Returns `None` (and spawns nothing) +/// when the masquerade is disabled. +#[cfg(feature = "masquerade")] +fn spawn_h3_router( conn: C, - masq: Option<&MasqueradeConfig>, - first_uni: Option>, - first_bidi: Option<(C::SendStream, wind_quic::PrefixedRecv)>, - remote_addr: SocketAddr, + masq: Option, cancel: CancellationToken, +) -> Option>> { + let cfg = masq?; + let (uni_tx, uni_rx) = mpsc::unbounded_channel(); + let (bidi_tx, bidi_rx) = mpsc::unbounded_channel(); + let go = Arc::new(Notify::new()); + tokio::spawn(masquerade::run_masquerade(conn, uni_rx, bidi_rx, go.clone(), cfg, cancel)); + Some(Arc::new(H3Senders { uni_tx, bidi_tx, go })) +} + +/// No router when the masquerade isn't compiled in. +#[cfg(not(feature = "masquerade"))] +fn spawn_h3_router( + _conn: C, + _masq: Option, + _cancel: CancellationToken, +) -> Option>> { + None +} + +/// Route a stream the classifier decided is **not** TUIC: hand it to the h3 +/// masquerade (waking the parked server), or close the connection if the +/// masquerade is off. +fn route_non_tuic( + ctx: &InboundCtx, + h3: Option<&Arc>>, + active: &AtomicBool, + stream: H3Stream, ) { - #[cfg(feature = "masquerade")] - if let Some(cfg) = masq { - info!("connection from {} is not TUIC; serving HTTP/3 masquerade", remote_addr); - if let Err(e) = masquerade::run_masquerade(conn, first_uni, first_bidi, cfg, cancel).await { - tracing::debug!("masquerade for {} ended: {e:?}", remote_addr); + if let Some(s) = h3 { + active.store(true, Ordering::Relaxed); + match stream { + H3Stream::Uni(recv) => { + let _ = s.uni_tx.send(recv); + } + H3Stream::Bi(send, recv) => { + let _ = s.bidi_tx.send((send, recv)); + } } - return; + s.go.notify_one(); + } else { + drop(stream); + ctx.conn.close(0, b""); } - let _ = (masq, first_uni, first_bidi, cancel); - tracing::debug!( - "connection from {} is not TUIC and masquerade is disabled; closing", - remote_addr - ); - conn.close(0, b""); } /// Drive an established TUIC connection: spawn the auth-timeout guard and the @@ -223,66 +256,6 @@ pub async fn serve_connection( C: QuicConnection, CB: InboundCallback, { - // --- Classify the connection: real TUIC vs HTTP/3 masquerade --- - // - // Both negotiate the `h3` ALPN, so we inspect the first thing the peer sends. - // The first event may be a uni stream, a bidi stream, or a datagram (the TUIC - // `Auth` is not guaranteed to arrive first), so race all three. We then peek - // the first two bytes: TUIC framing is `[VER, CmdType]` (see `is_tuic_prefix`), - // while an HTTP/3 client's streams begin with an h3 stream-type / frame-type - // byte. The peeked bytes are replayed via `PrefixedRecv` so neither the TUIC - // parser nor the h3 adapter loses any data. - let first = tokio::select! { - _ = cancel.cancelled() => return, - r = tokio::time::timeout(auth_timeout, async { - tokio::select! { - res = conn.accept_uni() => res.map(FirstEvent::::Uni), - res = conn.accept_bi() => res.map(|(s, r)| FirstEvent::::Bi(s, r)), - res = conn.read_datagram() => res.map(FirstEvent::::Datagram), - } - }) => r, - }; - let first = match first { - Ok(Ok(ev)) => ev, - Ok(Err(e)) => { - tracing::debug!("connection from {} closed before first event: {e:?}", remote_addr); - return; - } - Err(_) => { - // Nothing within the window: classification is impossible, so fall back - // to the masquerade (a silent prober still sees a web server) or close. - return dispatch_masquerade(conn, masq.as_ref(), None, None, remote_addr, cancel).await; - } - }; - - let tuic_first: TuicFirst = match first { - FirstEvent::Uni(mut recv) => { - let Some(prefix) = read_prefix(&mut recv).await else { return }; - let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); - if is_tuic_prefix(prefix) { - TuicFirst::Uni(recv) - } else { - return dispatch_masquerade(conn, masq.as_ref(), Some(recv), None, remote_addr, cancel).await; - } - } - FirstEvent::Bi(send, mut recv) => { - let Some(prefix) = read_prefix(&mut recv).await else { return }; - let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); - if is_tuic_prefix(prefix) { - TuicFirst::Bi(send, recv) - } else { - return dispatch_masquerade(conn, masq.as_ref(), None, Some((send, recv)), remote_addr, cancel).await; - } - } - FirstEvent::Datagram(dg) => { - if dg.len() >= 2 && is_tuic_prefix([dg[0], dg[1]]) { - TuicFirst::Datagram(dg) - } else { - return dispatch_masquerade(conn, masq.as_ref(), None, None, remote_addr, cancel).await; - } - } - }; - let udp_root_cancel = cancel.child_token(); // Eviction listener fires for both explicit `remove()` (via Dissociate) and @@ -308,15 +281,24 @@ pub async fn serve_connection( udp_root_cancel, }); - // Authentication timeout: close the connection if no UUID is set in time. + // Per-connection HTTP/3 masquerade router: a parked `run_masquerade` task plus + // two channels the acceptor loops feed. `None` when masquerade is disabled. + // `h3_active` flips true once any stream classifies as h3 so the auth-timeout + // guard knows not to close what is actually an HTTP/3 connection. + let h3 = spawn_h3_router(connection.conn.clone(), masq, cancel.clone()); + let h3_active = Arc::new(AtomicBool::new(false)); + + // Authentication timeout: close the connection if it never authenticated AND + // never turned out to be an HTTP/3 (masquerade) connection. { let conn_auth = connection.clone(); let auth_cancel = cancel.clone(); + let active = h3_active.clone(); tokio::spawn( async move { tokio::select! { _ = tokio::time::sleep(auth_timeout) => { - if conn_auth.uuid.load().is_none() { + if conn_auth.uuid.load().is_none() && !active.load(Ordering::Relaxed) { warn!("Connection from {} authentication timeout", remote_addr); conn_auth.conn.close(0, b"auth timeout"); } @@ -334,10 +316,11 @@ pub async fn serve_connection( // cache) is dropped instead of leaking until server shutdown. let acceptor_cancel = cancel.child_token(); - // Datagram acceptor. Pre-auth datagrams are handled inline (serially) so an - // unauthenticated peer can't spawn unbounded tasks parked on `auth_notify`; - // once authed, each datagram is dispatched in parallel so a slow outbound - // queue can't block the read loop. + // Datagram acceptor. Classify each datagram by its first two bytes: TUIC + // datagrams (heartbeat / native-mode UDP) are handled inline pre-auth (so an + // unauthenticated peer can't spawn unbounded tasks parked on `auth_notify`) + // and spawned post-auth; non-TUIC datagrams are dropped — the masquerade + // serves no QUIC datagrams. { let conn = connection.clone(); let cb = callback.clone(); @@ -352,6 +335,9 @@ pub async fn serve_connection( let conn = conn.clone(); let cb = cb.clone(); async move { + if datagram.len() < 2 || !is_tuic_prefix([datagram[0], datagram[1]]) { + return; + } if conn.uuid.load().is_some() { tokio::spawn( spawn_logged("Datagram", handle_datagram(conn, datagram, cb)) @@ -369,11 +355,16 @@ pub async fn serve_connection( ); } - // Uni stream acceptor. + // Uni stream acceptor. Every accepted stream reads its 2-byte prefix and + // routes itself: TUIC (`is_tuic_prefix`) → `handle_uni_stream`, otherwise → + // the h3 masquerade (or close). The peeked bytes are replayed via + // `PrefixedRecv` so the chosen handler reads from byte 0. { let conn = connection.clone(); let cb = callback.clone(); let uni_cancel = acceptor_cancel.clone(); + let h3 = h3.clone(); + let active = h3_active.clone(); tokio::spawn( async move { acceptor_loop( @@ -383,10 +374,23 @@ pub async fn serve_connection( |recv| { let conn = conn.clone(); let cb = cb.clone(); + let h3 = h3.clone(); + let active = active.clone(); async move { tokio::spawn( - spawn_logged("Uni stream", handle_uni_stream(conn, recv, cb)) - .instrument(tracing::debug_span!("uni_stream")), + async move { + let mut recv = recv; + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + if let Err(e) = handle_uni_stream(conn, recv, cb).await { + error!("Uni stream error: {e:?}"); + } + } else { + route_non_tuic(&conn, h3.as_ref(), &active, H3Stream::Uni(recv)); + } + } + .instrument(tracing::debug_span!("uni_stream")), ); } }, @@ -397,11 +401,13 @@ pub async fn serve_connection( ); } - // Bi stream acceptor. + // Bi stream acceptor — same per-stream classify + route. { let conn = connection.clone(); let cb = callback.clone(); let bi_cancel = acceptor_cancel.clone(); + let h3 = h3.clone(); + let active = h3_active.clone(); tokio::spawn( async move { acceptor_loop( @@ -411,10 +417,23 @@ pub async fn serve_connection( |(send, recv)| { let conn = conn.clone(); let cb = cb.clone(); + let h3 = h3.clone(); + let active = active.clone(); async move { tokio::spawn( - spawn_logged("Bi stream", handle_bi_stream(conn, send, recv, cb)) - .instrument(tracing::debug_span!("bi_stream")), + async move { + let mut recv = recv; + let Some(prefix) = read_prefix(&mut recv).await else { return }; + let recv = wind_quic::PrefixedRecv::new(bytes::Bytes::copy_from_slice(&prefix), recv); + if is_tuic_prefix(prefix) { + if let Err(e) = handle_bi_stream(conn, send, recv, cb).await { + error!("Bi stream error: {e:?}"); + } + } else { + route_non_tuic(&conn, h3.as_ref(), &active, H3Stream::Bi(send, recv)); + } + } + .instrument(tracing::debug_span!("bi_stream")), ); } }, @@ -425,33 +444,6 @@ pub async fn serve_connection( ); } - // Process the first event already consumed during classification (with any - // peeked header bytes replayed). Subsequent events are handled by the acceptor - // loops above. - { - let conn = connection.clone(); - let cb = callback.clone(); - match tuic_first { - TuicFirst::Uni(recv) => { - tokio::spawn( - spawn_logged("Uni stream", handle_uni_stream(conn, recv, cb)) - .instrument(tracing::debug_span!("uni_stream")), - ); - } - TuicFirst::Bi(send, recv) => { - tokio::spawn( - spawn_logged("Bi stream", handle_bi_stream(conn, send, recv, cb)) - .instrument(tracing::debug_span!("bi_stream")), - ); - } - TuicFirst::Datagram(dg) => { - tokio::spawn( - spawn_logged("Datagram", handle_datagram(conn, dg, cb)).instrument(tracing::debug_span!("datagram")), - ); - } - } - } - // Exit on either server shutdown or peer disconnect. tokio::select! { _ = cancel.cancelled() => { From 99fe58687cc54baf50eeee5bf7c244b58e682bb5 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 20:21:16 +0800 Subject: [PATCH 08/10] chore: track .cargo/config.toml so reqwest_unstable is the repo default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `.cargo/config.toml` was gitignored, so its `--cfg reqwest_unstable` flag (and `RUSTC_BOOTSTRAP=1`, needed for the repo's nightly rustfmt.toml options) only existed locally. Un-ignore and commit it so the flag is the workspace default: the `h3-masquerade-test` now runs with just cargo test -p tuic-tests --features h3-masquerade-test (no manual RUSTFLAGS). The cfg stays inert for normal builds — reqwest's http3 code needs both the cfg and the `http3` feature, and only the opt-in test feature enables that. Updated the test docs accordingly. Co-Authored-By: Claude Opus 4.8 --- .cargo/config.toml | 10 ++++++++++ .gitignore | 4 ++++ crates/tuic-tests/Cargo.toml | 15 +++++++-------- crates/tuic-tests/tests/masquerade.rs | 6 +++--- 4 files changed, 24 insertions(+), 11 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..634eba2 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,10 @@ +[build] +#rustc-wrapper = "sccache" +# Unlocks reqwest's experimental HTTP/3 client, used only by tuic-tests' +# `h3-masquerade-test` feature. Inert unless reqwest is built with its `http3` +# feature (which only that opt-in test feature enables), so default builds are +# unaffected — no extra deps, no http3 code. +rustflags = ["--cfg", "reqwest_unstable"] + +[env] +RUSTC_BOOTSTRAP = { value = "1" } diff --git a/.gitignore b/.gitignore index 2f66e01..5100169 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ target/ **/*.rs.bk /target config.toml +# ...but the workspace's `.cargo/config.toml` IS tracked: it carries +# `RUSTC_BOOTSTRAP=1` (needed for the repo's nightly rustfmt.toml options) and the +# `--cfg reqwest_unstable` flag that the `h3-masquerade-test` feature relies on. +!.cargo/config.toml .claude \ No newline at end of file diff --git a/crates/tuic-tests/Cargo.toml b/crates/tuic-tests/Cargo.toml index 453b095..d84a0a9 100644 --- a/crates/tuic-tests/Cargo.toml +++ b/crates/tuic-tests/Cargo.toml @@ -14,11 +14,10 @@ ring = ["wind-tuic/ring", "rustls/ring", "tuic-client/ring", "tuic-server/ring"] aws-lc-rs = ["wind-tuic/aws-lc-rs", "dep:aws-lc-rs", "rustls/aws-lc-rs", "tuic-client/aws-lc-rs", "tuic-server/aws-lc-rs"] # End-to-end HTTP/3 masquerade test (tests/masquerade.rs). Opt-in because it -# pulls reqwest's experimental HTTP/3 client (a second quinn/h3 stack) and needs -# the `--cfg reqwest_unstable` rustc flag. Run with: -# RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features h3-masquerade-test -# (without the flag the test is cfg'd out — it simply doesn't run, never breaks -# the build.) +# pulls reqwest's experimental HTTP/3 client (a second quinn/h3 stack). The +# `--cfg reqwest_unstable` flag it needs is set by the workspace `.cargo/config.toml`, +# so just run: +# cargo test -p tuic-tests --features h3-masquerade-test h3-masquerade-test = ["wind-tuic/masquerade", "dep:reqwest", "dep:http"] [dependencies] @@ -46,9 +45,9 @@ tokio-util = { version = "0.7", features = ["codec"] } reqwest = { version = "0.12", default-features = false, features = ["http3", "rustls-tls"], optional = true } http = { version = "1", optional = true } -# `reqwest_unstable` is a rustc cfg (passed via RUSTFLAGS for the -# `h3-masquerade-test`) that gates reqwest's HTTP/3 client and the masquerade -# test; declare it so it isn't flagged as an unknown cfg. +# `reqwest_unstable` is a rustc cfg (set in the workspace `.cargo/config.toml`) +# that gates reqwest's HTTP/3 client and the masquerade test; declare it so it +# isn't flagged as an unknown cfg. [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(reqwest_unstable)'] } diff --git a/crates/tuic-tests/tests/masquerade.rs b/crates/tuic-tests/tests/masquerade.rs index 121f3fc..120cf84 100644 --- a/crates/tuic-tests/tests/masquerade.rs +++ b/crates/tuic-tests/tests/masquerade.rs @@ -7,9 +7,9 @@ //! handshake, first-byte classification (`0x05` vs not), the `h3::quic` //! adapter, the `h3` server, and the reqwest reverse proxy to the upstream. //! -//! Opt-in (pulls reqwest's experimental HTTP/3 stack + needs the `--cfg -//! reqwest_unstable` rustc flag; without it the test is cfg'd out): -//! RUSTFLAGS="--cfg reqwest_unstable" cargo test -p tuic-tests --features +//! Opt-in (pulls reqwest's experimental HTTP/3 stack; the `--cfg +//! reqwest_unstable` flag it needs is set by the workspace +//! `.cargo/config.toml`): cargo test -p tuic-tests --features //! h3-masquerade-test #![cfg(all(feature = "h3-masquerade-test", reqwest_unstable, target_pointer_width = "64"))] From 40b7eda4721fcbffa8eea14ca58fab7dbd9d8823 Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 20:26:05 +0800 Subject: [PATCH 09/10] ci: set rustflags --cfg reqwest_unstable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reusable build workflow exports its `rustflags` input as the `RUSTFLAGS` env var, which overrides `.cargo/config.toml [build] rustflags` — so the committed `--cfg reqwest_unstable` was being dropped in CI. Mirror it in the workflow input so the cfg stays enabled in CI (inert unless a crate also enables reqwest's `http3` feature). Co-Authored-By: Claude Opus 4.8 --- .github/workflows/ci.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04788d8..de796df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,11 @@ jobs: rust-toolchain: "stable" packages: "wind,tuic-server,tuic-client" target-config-file: ".github/target.toml" - rustflags: "" + # The reusable workflow exports this as the `RUSTFLAGS` env var, which + # overrides `.cargo/config.toml [build] rustflags`; mirror the committed + # `--cfg reqwest_unstable` here so it stays enabled in CI (inert unless a + # crate also turns on reqwest's `http3` feature). + rustflags: "--cfg reqwest_unstable" enable-tmate: false only-clippy-tests-on-pr: false # GitHub caps the per-repo Actions cache at 10 GB and each matrix target From cf5d8bbab3d02e2e5275e5b6c0f116394daefc5c Mon Sep 17 00:00:00 2001 From: iHsin Date: Sat, 13 Jun 2026 20:49:47 +0800 Subject: [PATCH 10/10] fix(wind-tuic): harden masquerade error paths and the e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three review findings: - The parked `run_masquerade` task was spawned with its `Result` dropped. On failure (invalid upstream URL, h3 setup error) the error vanished AND the connection leaked: a non-TUIC stream had already flipped `h3_active`, so the auth-timeout guard would never reap it. Now log the error and close the connection, mirroring that guard's cleanup. - The response body size cap was checked only *after* the response head was sent, so an over-cap body produced a truncated-but-"200" reply (the 502 fallback can't fire once headers are out). Now reject over-cap responses by `Content-Length` before committing the head (clean 502), and for streamed / under-reported bodies that overflow mid-relay, reset the h3 stream (`stop_stream`) instead of finishing it — the prober sees an aborted response, not a silent truncation. Any post-head failure now resets rather than attempting a second response. - The e2e test ignored `tuic_server::run`'s result and slept 1s on a fixed port, so a startup failure (e.g. port in use) surfaced only as an opaque 10s client timeout. Now wait for readiness via `select!` on the server JoinHandle and surface the real startup error immediately. Co-Authored-By: Claude Opus 4.8 --- crates/tuic-tests/tests/masquerade.rs | 22 ++++++++++++++--- crates/wind-tuic/src/server/masquerade.rs | 30 +++++++++++++++++++++++ crates/wind-tuic/src/server/mod.rs | 13 +++++++++- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/crates/tuic-tests/tests/masquerade.rs b/crates/tuic-tests/tests/masquerade.rs index 120cf84..61ec379 100644 --- a/crates/tuic-tests/tests/masquerade.rs +++ b/crates/tuic-tests/tests/masquerade.rs @@ -89,10 +89,26 @@ async fn masquerade_reverse_proxies_http3_probes() -> eyre::Result<()> { ..Default::default() }; - tokio::spawn(async move { - let _ = timeout(Duration::from_secs(20), tuic_server::run(cfg)).await; + // `run` blocks forever on success; bound it so a hung server can't wedge the + // test runner, and treat the safety-timeout as "still serving". + let mut server = tokio::spawn(async move { + match timeout(Duration::from_secs(20), tuic_server::run(cfg)).await { + Ok(res) => res, + Err(_) => Ok(()), + } }); - tokio::time::sleep(Duration::from_secs(1)).await; + + // Wait for the server to bind. If it instead exits early (e.g. the fixed port + // is already in use), surface that real error now rather than letting the + // HTTP/3 request below fail with an opaque 10s timeout. + tokio::select! { + joined = &mut server => match joined { + Ok(Ok(())) => eyre::bail!("tuic-server exited before serving any request"), + Ok(Err(e)) => eyre::bail!("tuic-server failed to start: {e:?}"), + Err(e) => eyre::bail!("tuic-server task panicked: {e}"), + }, + _ = tokio::time::sleep(Duration::from_secs(1)) => {} + } // reqwest as a real HTTP/3 prober. `danger_accept_invalid_certs` because the // server uses a self-signed cert; `http3_prior_knowledge` forces h3. diff --git a/crates/wind-tuic/src/server/masquerade.rs b/crates/wind-tuic/src/server/masquerade.rs index 71c87de..faa1915 100644 --- a/crates/wind-tuic/src/server/masquerade.rs +++ b/crates/wind-tuic/src/server/masquerade.rs @@ -149,6 +149,36 @@ where let resp = req.send().await?; + // Reject an over-cap response *before* committing the response head, so the + // `502` fallback in `handle_request` can still fire cleanly. (A streamed + // response that under-reports or omits `Content-Length` is still bounded + // mid-relay below.) + if let Some(len) = resp.content_length() + && len > MAX_RESPONSE_BODY_SIZE as u64 + { + return Err(eyre::eyre!( + "upstream Content-Length {len} exceeds {MAX_RESPONSE_BODY_SIZE} bytes" + )); + } + + // From here the response head is committed to the h3 stream. A later failure + // (over-cap body, send error) must NOT become a second `502` response on a + // stream that already sent `200` — that yields a truncated-but-"successful" + // reply. Reset the stream instead, so the prober sees an aborted response. + if let Err(e) = relay_response(stream, resp).await { + debug!("masquerade response failed after the head was sent; resetting h3 stream: {e}"); + stream.stop_stream(h3::error::Code::H3_INTERNAL_ERROR); + } + Ok(()) +} + +/// Send the upstream response head, stream its body back (size-capped), and +/// finish the stream. Any error leaves the already-committed response +/// incomplete; the caller resets the stream rather than sending a fresh status. +async fn relay_response(stream: &mut h3::server::RequestStream, resp: reqwest::Response) -> eyre::Result<()> +where + S: h3::quic::BidiStream, +{ let mut builder = http::Response::builder().status(resp.status()); for (name, value) in resp.headers() { if is_forwardable(name) { diff --git a/crates/wind-tuic/src/server/mod.rs b/crates/wind-tuic/src/server/mod.rs index 5cc9f0f..1428951 100644 --- a/crates/wind-tuic/src/server/mod.rs +++ b/crates/wind-tuic/src/server/mod.rs @@ -200,7 +200,18 @@ fn spawn_h3_router( let (uni_tx, uni_rx) = mpsc::unbounded_channel(); let (bidi_tx, bidi_rx) = mpsc::unbounded_channel(); let go = Arc::new(Notify::new()); - tokio::spawn(masquerade::run_masquerade(conn, uni_rx, bidi_rx, go.clone(), cfg, cancel)); + // Run the masquerade parked. If it fails (invalid upstream URL, h3 setup + // error) the connection would otherwise leak: a non-TUIC stream has already + // flipped `h3_active`, so the auth-timeout guard won't reap it. Log the error + // and close the connection ourselves, mirroring that guard's cleanup. + let close_conn = conn.clone(); + let go_task = go.clone(); + tokio::spawn(async move { + if let Err(e) = masquerade::run_masquerade(conn, uni_rx, bidi_rx, go_task, cfg, cancel).await { + warn!("HTTP/3 masquerade task failed; closing connection: {e:?}"); + close_conn.close(0, b""); + } + }); Some(Arc::new(H3Senders { uni_tx, bidi_tx, go })) }