Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use async_trait::async_trait;
use log::{debug, error};
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;

use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
Expand Down Expand Up @@ -81,6 +82,12 @@ pub struct HttpServerOptions {
///
/// Unlike nginx, the default behavior here is _no limit_.
pub keepalive_request_limit: Option<u32>,

/// If set, close a downstream HTTP/2 connection that has been idle
/// for this duration.
///
/// Default: `None`
pub h2_idle_timeout: Option<Duration>,
}

/// Settings persisted across HTTP/1.x keepalive requests on the same downstream connection.
Expand Down Expand Up @@ -262,15 +269,25 @@ where
// the same code path is exercised by tests in `protocols::http::v2`.
let app = self.clone();
let shutdown_for_session = shutdown.clone();
server::accept_downstream_sessions(h2_conn, digest, shutdown.clone(), |h2_stream| {
let app = app.clone();
let shutdown = shutdown_for_session.clone();
pingora_runtime::current_handle().spawn(async move {
// Note, `PersistentSettings` not currently relevant for h2
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
})
let h2_idle_timeout = self.server_options().and_then(|o| o.h2_idle_timeout);
server::accept_downstream_sessions(
h2_conn,
digest,
shutdown.clone(),
h2_idle_timeout,
|h2_stream, guard| {
let app = app.clone();
let shutdown = shutdown_for_session.clone();
pingora_runtime::current_handle().spawn(async move {
// hold `guard` for the session's lifetime so the accept
// loop's idle timeout sees this connection as busy.
let _guard = guard;
// Note, `PersistentSettings` not currently relevant for h2
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
},
)
.await;
} else if custom || matches!(stream.selected_alpn_proto(), Some(ALPN::Custom(_))) {
return self.clone().process_custom_session(stream, shutdown).await;
Expand Down
113 changes: 90 additions & 23 deletions pingora-core/src/protocols/http/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,20 @@ mod test {
});

let mut session_handles = vec![];
server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| {
session_handles.push(tokio::spawn(async move {
let req = session.req_header();
assert_eq!(req.method, Method::GET);
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
})
server::accept_downstream_sessions(
connection,
digest,
shutdown_rx,
None,
|mut session, _guard| {
session_handles.push(tokio::spawn(async move {
let req = session.req_header();
assert_eq!(req.method, Method::GET);
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
},
)
.await;

trigger.await.unwrap();
Expand Down Expand Up @@ -443,12 +449,18 @@ mod test {
});

let mut session_handles = vec![];
server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| {
session_handles.push(tokio::spawn(async move {
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
})
server::accept_downstream_sessions(
connection,
digest,
shutdown_rx,
None,
|mut session, _guard| {
session_handles.push(tokio::spawn(async move {
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
},
)
.await;

trigger.await.unwrap();
Expand Down Expand Up @@ -502,9 +514,15 @@ mod test {

let result = pingora_timeout::timeout(
Duration::from_secs(2),
server::accept_downstream_sessions(connection, digest, shutdown_rx, |_session| {
panic!("did not expect any sessions on an idle connection");
}),
server::accept_downstream_sessions(
connection,
digest,
shutdown_rx,
None,
|_session, _guard| {
panic!("did not expect any sessions on an idle connection");
},
),
)
.await;
assert!(result.is_ok(), "accept loop hung after shutdown");
Expand All @@ -513,6 +531,49 @@ mod test {
client_handle.await.unwrap();
}

#[tokio::test]
async fn test_h2_idle_timeout_closes_idle_connection() {
let (mut client, server) = duplex(65536);
// Keep the sender alive so `shutdown.changed()` stays pending — the only
// thing that should end the accept loop is the idle timeout.
let (_shutdown_tx, shutdown_rx) = watch::channel(false);

let client_handle = tokio::spawn(async move {
client
.write_all(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
.await
.unwrap();
let mut codec: h2::Codec<DuplexStream, Bytes> = h2::Codec::new(client);
codec.send(Settings::default().into()).await.unwrap();
codec.send(Settings::ack().into()).await.unwrap();
// Open no streams; stay connected and drain frames until the server
// drops the connection on idle timeout (codec returns None on EOF).
while let Some(frame) = codec.next().await {
let _ = frame;
}
});

let connection = handshake(Box::new(server), None).await.unwrap();
let digest = Arc::new(Digest::default());

// No shutdown is signaled and no stream is opened; a short idle timeout
// must make the accept loop return on its own.
let result = pingora_timeout::timeout(
Duration::from_secs(2),
server::accept_downstream_sessions(
connection,
digest,
shutdown_rx,
Some(Duration::from_millis(100)),
|_session, _guard| panic!("did not expect any sessions on an idle connection"),
),
)
.await;
assert!(result.is_ok(), "idle timeout did not close the connection");

client_handle.await.unwrap();
}

#[tokio::test]
async fn test_graceful_shutdown_refuses_stream_above_last_stream_id() {
// After the server commits to a final last_stream_id and emits the
Expand Down Expand Up @@ -600,12 +661,18 @@ mod test {
let mut session_handles = vec![];
let result = pingora_timeout::timeout(
Duration::from_secs(5),
server::accept_downstream_sessions(connection, digest, shutdown_rx, |mut session| {
session_handles.push(tokio::spawn(async move {
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
}),
server::accept_downstream_sessions(
connection,
digest,
shutdown_rx,
None,
|mut session, _guard| {
session_handles.push(tokio::spawn(async move {
let resp = Box::new(ResponseHeader::build(200, None).unwrap());
session.write_response_header(resp, true).unwrap();
}));
},
),
)
.await;
assert!(result.is_ok(), "accept loop hung after shutdown");
Expand Down
47 changes: 40 additions & 7 deletions pingora-core/src/protocols/http/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use http::{header, HeaderMap, Response};
use log::{debug, warn};
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_timeout::timeout;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::ready;
use std::time::Duration;
Expand Down Expand Up @@ -87,23 +88,29 @@ pub async fn handshake(io: Stream, options: Option<H2Options>) -> Result<H2Conne
/// - the codec is driven to completion so the final GOAWAY can be
/// flushed and the connection closed cleanly.
///
/// `on_session` is invoked once per accepted stream. Typical callers spawn a
/// task to process the session so the accept loop is not blocked.
/// `on_session` is invoked once per accepted stream, together with a
/// [`StreamGuard`]. Typical callers spawn a task to process the session so the
/// accept loop is not blocked, and move the guard into that task so its lifetime
/// matches the session.
///
/// Note: this function does not impose its own per-connection drain timeout.
/// The runtime-level `graceful_shutdown_timeout_seconds` is the only ceiling,
/// so a slow client can keep this future alive up to that bound.
/// Note: this function does not impose its own per-connection drain timeout
/// (after a shutdown signal). The runtime-level `graceful_shutdown_timeout_seconds`
/// is the only ceiling there, so a slow client can keep this future alive up to
/// that bound during drain.
// TODO: add a per-connection drain timeout to bound how long a single
// misbehaving client can keep this task alive after GOAWAY.
pub(crate) async fn accept_downstream_sessions<F>(
mut conn: H2Connection<Stream>,
digest: Arc<Digest>,
mut shutdown: ShutdownWatch,
idle_timeout: Option<Duration>,
mut on_session: F,
) where
F: FnMut(HttpSession),
F: FnMut(HttpSession, StreamGuard),
{
let mut shutdown_initiated = false;
// In-flight sessions, decremented by the `StreamGuard` given to `on_session`.
let active = Arc::new(AtomicUsize::new(0));
loop {
let h2_stream = if shutdown_initiated {
HttpSession::from_h2_conn(&mut conn, digest.clone()).await
Expand All @@ -119,6 +126,16 @@ pub(crate) async fn accept_downstream_sessions<F>(
continue;
}
h2_stream = HttpSession::from_h2_conn(&mut conn, digest.clone()) => h2_stream,
// Idle timeout: re-armed each iteration, so any accepted stream resets it
_ = pingora_timeout::sleep(idle_timeout.unwrap_or_default()), if idle_timeout.is_some() => {
if active.load(Ordering::Relaxed) == 0 {
// Idle with nothing in flight: drop `conn` to close the
// socket now (no graceful GOAWAY wait that could hang on
// a dead peer).
return;
}
continue;
}
}
};
match h2_stream {
Expand All @@ -130,11 +147,27 @@ pub(crate) async fn accept_downstream_sessions<F>(
}
// None means the connection is ready to be closed
Ok(None) => return,
Ok(Some(session)) => on_session(session),
Ok(Some(session)) => {
active.fetch_add(1, Ordering::Relaxed);
on_session(session, StreamGuard(active.clone()));
}
}
}
}

/// Tracks one in-flight downstream H2 session for [`accept_downstream_sessions`].
/// `on_session` receives it alongside each session; keep it alive for as long as
/// the session is being processed (e.g. move it into the spawned task) so the
/// accept loop's idle timeout can tell a busy connection from an idle one. It
/// decrements the in-flight counter when dropped.
pub(crate) struct StreamGuard(Arc<AtomicUsize>);

impl Drop for StreamGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::Relaxed);
}
}

use futures::task::Context;
use futures::task::Poll;
use std::pin::Pin;
Expand Down
Loading