Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions devolutions-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod service;
use std::env;
use std::io::{self, BufRead};
use std::sync::mpsc;
use std::time::Duration;

use anyhow::{Context as _, Result, bail};
use ceviche::Service;
Expand Down Expand Up @@ -285,6 +286,35 @@ fn main() {
std::process::exit(1);
}
}
"probe-tunnel" => {
let timeout_secs = match env::args().nth(2) {
Some(value) => match value.parse() {
Ok(timeout_secs) => timeout_secs,
Err(error) => {
eprintln!("[ERROR] Invalid probe timeout: {error}");
std::process::exit(1);
}
},
None => 15,
};

let conf_handle = match ConfHandle::init() {
Ok(conf_handle) => conf_handle,
Err(error) => {
eprintln!("[ERROR] Failed to load agent configuration: {error:#}");
std::process::exit(1);
}
};
let conf = conf_handle.get_conf();
let rt = tokio::runtime::Runtime::new().expect("failed to create tokio runtime");
if let Err(error) = rt.block_on(devolutions_agent::tunnel::probe_tunnel_connectivity(
&conf.tunnel,
Duration::from_secs(timeout_secs),
)) {
eprintln!("[ERROR] Tunnel connectivity probe failed: {error:#}");
std::process::exit(1);
}
}
_ => {
eprintln!("[ERROR] Invalid command: {cmd}");
}
Expand Down
198 changes: 176 additions & 22 deletions devolutions-agent/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,25 +190,10 @@ enum ConnectionOutcome {
CertRenewed,
}

/// Run a single QUIC tunnel connection lifetime: config → connect → event loop.
///
/// - `Ok(Shutdown)`: graceful shutdown, exit the task.
/// - `Ok(CertRenewed)`: certificate renewed; caller should reconnect immediately.
/// - `Err(...)`: connection lost or handshake failed — caller should retry with backoff.
async fn run_single_connection(
conf_handle: &ConfHandle,
shutdown_signal: &mut ShutdownSignal,
) -> anyhow::Result<ConnectionOutcome> {
// Ensure rustls crypto provider is installed (ring).
let _ = rustls::crypto::ring::default_provider().install_default();

let agent_conf = conf_handle.get_conf();
let tunnel_conf = &agent_conf.tunnel;

let cert_path = &tunnel_conf.client_cert_path;
let key_path = &tunnel_conf.client_key_path;
let ca_path = &tunnel_conf.gateway_ca_cert_path;

/// Build the route advertisement payload from the current tunnel configuration.
fn route_advertisements(
tunnel_conf: &crate::config::TunnelConf,
) -> anyhow::Result<(Vec<Ipv4Network>, Vec<agent_tunnel_proto::DomainAdvertisement>)> {
let advertise_subnets: Vec<Ipv4Network> = tunnel_conf
.advertise_subnets
.iter()
Expand Down Expand Up @@ -260,23 +245,33 @@ async fn run_single_connection(
"Advertising subnets and domains"
);

Ok((advertise_subnets, advertise_domains))
}

/// Build the mTLS client config, resolve the gateway endpoint, and perform the
/// QUIC handshake, returning the live endpoint and connection.
async fn connect_to_gateway(
tunnel_conf: &crate::config::TunnelConf,
) -> anyhow::Result<(quinn::Endpoint, quinn::Connection)> {
// Ensure rustls crypto provider is installed (ring).
let _ = rustls::crypto::ring::default_provider().install_default();
// -- Build rustls ClientConfig --

let certs: Vec<rustls_pki_types::CertificateDer<'static>> = rustls_pemfile::certs(&mut std::io::BufReader::new(
std::fs::File::open(cert_path.as_str()).context("open client cert file")?,
std::fs::File::open(tunnel_conf.client_cert_path.as_str()).context("open client cert file")?,
))
.collect::<Result<Vec<_>, _>>()
.context("parse client certificates")?;

let key = rustls_pemfile::private_key(&mut std::io::BufReader::new(
std::fs::File::open(key_path.as_str()).context("open client key file")?,
std::fs::File::open(tunnel_conf.client_key_path.as_str()).context("open client key file")?,
))
.context("parse private key file")?
.context("no private key found in file")?;

let mut roots = rustls::RootCertStore::empty();
let ca_certs: Vec<rustls_pki_types::CertificateDer<'static>> = rustls_pemfile::certs(&mut std::io::BufReader::new(
std::fs::File::open(ca_path.as_str()).context("open CA cert file")?,
std::fs::File::open(tunnel_conf.gateway_ca_cert_path.as_str()).context("open CA cert file")?,
))
.collect::<Result<Vec<_>, _>>()
.context("parse CA certificates")?;
Expand Down Expand Up @@ -363,6 +358,84 @@ async fn run_single_connection(

info!("QUIC connection established");

Ok((endpoint, connection))
}

/// Probe end-to-end QUIC tunnel connectivity to the Gateway, bounded by `timeout`.
///
/// Connects (mTLS over QUIC) and performs a single `Heartbeat` → `HeartbeatAck`
/// round-trip, proving UDP reachability, the QUIC + mTLS handshake, and a working
/// bidirectional control stream. Returns an error if the tunnel is disabled, the
/// handshake or round-trip fails, or `timeout` elapses first.
pub async fn probe_tunnel_connectivity(
tunnel_conf: &crate::config::TunnelConf,
timeout: Duration,
) -> anyhow::Result<()> {
if !tunnel_conf.enabled {
bail!("agent tunnel is not enabled");
}

// Bound only the connectivity check itself — connect plus the Heartbeat round-trip.
// Once the HeartbeatAck proves the tunnel works, teardown must never be able to turn
// that proven success into a timeout failure.
let (endpoint, connection) = tokio::time::timeout(timeout, async {
let (endpoint, connection) = connect_to_gateway(tunnel_conf).await?;

let mut ctrl: ControlStream<_, _> = connection.open_bi().await.context("open control stream")?.into();

// A bare Heartbeat is enough to prove connectivity: the gateway acks any
// Heartbeat unconditionally with the echoed timestamp. We deliberately do NOT
// send a RouteAdvertise here — that would mutate the gateway's route registry,
// and a connectivity probe must be side-effect-free.
let timestamp_ms = current_time_millis();
ctrl.send(&ControlMessage::heartbeat(timestamp_ms, 0))
.await
.context("send probe Heartbeat")?;

match ctrl.recv().await.context("receive probe HeartbeatAck")? {
ControlMessage::HeartbeatAck {
timestamp_ms: echoed_timestamp_ms,
..
} if echoed_timestamp_ms == timestamp_ms => Ok((endpoint, connection)),
ControlMessage::HeartbeatAck { .. } => bail!("probe HeartbeatAck timestamp mismatch"),
unexpected => bail!("unexpected probe response: {unexpected:?}"),
}
})
.await
.context("agent tunnel connectivity probe timed out")??;

info!("Agent tunnel connectivity probe succeeded");

// Connectivity is already proven, so teardown is best-effort and must never fail the
// probe. Drain (bounded, result ignored) so the queued CONNECTION_CLOSE is flushed
// before the runtime and endpoint are dropped — otherwise the gateway keeps this
// probe's same-agent_id connection registered until its idle timeout, and that late
// cleanup can unregister the real agent.
connection.close(0u32.into(), b"probe-complete");
let _ = tokio::time::timeout(Duration::from_secs(3), endpoint.wait_idle()).await;

Ok(())
}

/// Run a single QUIC tunnel connection lifetime: config → connect → event loop.
///
/// - `Ok(Shutdown)`: graceful shutdown, exit the task.
/// - `Ok(CertRenewed)`: certificate renewed; caller should reconnect immediately.
/// - `Err(_)`: connection lost or handshake failed — caller should retry with backoff.
async fn run_single_connection(
conf_handle: &ConfHandle,
shutdown_signal: &mut ShutdownSignal,
) -> anyhow::Result<ConnectionOutcome> {
let agent_conf = conf_handle.get_conf();
let tunnel_conf = &agent_conf.tunnel;

let cert_path = &tunnel_conf.client_cert_path;
let key_path = &tunnel_conf.client_key_path;
let ca_path = &tunnel_conf.gateway_ca_cert_path;

let (advertise_subnets, advertise_domains) = route_advertisements(tunnel_conf)?;
let (_endpoint, connection) = connect_to_gateway(tunnel_conf).await?;

// -- Open control stream --

let mut ctrl: ControlStream<_, _> = connection.open_bi().await.context("open control stream")?.into();
Expand Down Expand Up @@ -638,3 +711,84 @@ async fn run_session_proxy(advertise_subnets: Vec<Ipv4Network>, send: quinn::Sen
.await
.inspect_err(|e| error!(%e, "Session proxy failed"));
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use camino::Utf8PathBuf;

use super::probe_tunnel_connectivity;
use crate::config::TunnelConf;

/// A minimal enabled tunnel config; individual tests override what they need.
fn tunnel_conf_template() -> TunnelConf {
TunnelConf {
enabled: true,
gateway_endpoint: String::new(),
client_cert_path: Utf8PathBuf::new(),
client_key_path: Utf8PathBuf::new(),
gateway_ca_cert_path: Utf8PathBuf::new(),
advertise_subnets: Vec::new(),
advertise_domains: Vec::new(),
auto_detect_domain: false,
heartbeat_interval_secs: 15,
route_advertise_interval_secs: 60,
server_spki_sha256: None,
}
}

#[tokio::test]
async fn probe_fails_fast_when_tunnel_disabled() {
let mut conf = tunnel_conf_template();
conf.enabled = false;

let error = probe_tunnel_connectivity(&conf, Duration::from_secs(5))
.await
.expect_err("probe must fail when the tunnel is disabled");

assert!(
format!("{error:#}").contains("not enabled"),
"unexpected error: {error:#}"
);
}

#[tokio::test]
async fn probe_times_out_when_gateway_unreachable() {
// Throwaway PEMs so the pre-connect file reads succeed. The QUIC handshake never
// completes because nothing listens on the target port, so the probe must hit its
// own timeout rather than hang.
let cert_key =
rcgen::generate_simple_self_signed(vec!["localhost".to_owned()]).expect("generate self-signed cert");
let cert_pem = cert_key.cert.pem();
let key_pem = cert_key.key_pair.serialize_pem();

let dir = std::env::temp_dir().join(format!("dgw-probe-test-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).expect("create temp dir");
let cert_path = dir.join("client.crt");
let key_path = dir.join("client.key");
let ca_path = dir.join("ca.crt");
std::fs::write(&cert_path, &cert_pem).expect("write client cert");
std::fs::write(&key_path, &key_pem).expect("write client key");
std::fs::write(&ca_path, &cert_pem).expect("write ca cert");

let mut conf = tunnel_conf_template();
// 127.0.0.1:1 is reserved and unbound; the QUIC handshake cannot complete.
conf.gateway_endpoint = "127.0.0.1:1".to_owned();
conf.client_cert_path = Utf8PathBuf::from_path_buf(cert_path).expect("utf8 cert path");
conf.client_key_path = Utf8PathBuf::from_path_buf(key_path).expect("utf8 key path");
conf.gateway_ca_cert_path = Utf8PathBuf::from_path_buf(ca_path).expect("utf8 ca path");

let started = std::time::Instant::now();
let result = probe_tunnel_connectivity(&conf, Duration::from_secs(2)).await;
let elapsed = started.elapsed();

let _ = std::fs::remove_dir_all(&dir);

assert!(result.is_err(), "probe must fail when the gateway is unreachable");
assert!(
elapsed < Duration::from_secs(15),
"probe must fail fast, took {elapsed:?}"
);
}
}
58 changes: 58 additions & 0 deletions package/AgentWindowsManaged/Actions/CustomActions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,64 @@ ActionResult Fail(string msg)
WriteTunnelAdvertisementsToConfig(session, subnetsArg, domainsArg);
}

string probeArguments = "probe-tunnel 15";
session.Log($"Running tunnel connectivity probe: {exePath} {probeArguments}");

ProcessStartInfo probeStartInfo = new(exePath, probeArguments)
{
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true,
WorkingDirectory = ProgramDataDirectory,
};

using Process probeProcess = Process.Start(probeStartInfo);
Task<string> probeStdoutTask = probeProcess.StandardOutput.ReadToEndAsync();
Task<string> probeStderrTask = probeProcess.StandardError.ReadToEndAsync();

if (!probeProcess.WaitForExit(30_000))
{
try
{
probeProcess.Kill();
}
catch
{
// Already exited between WaitForExit timing out and Kill firing.
}

try
{
Task.WaitAll(new Task[] { probeStdoutTask, probeStderrTask }, 5000);
}
catch
{
// Observed.
}

return Fail("Agent tunnel connectivity probe timed out. Verify UDP connectivity to the Gateway QUIC endpoint (typically UDP 4433) and firewall/NAT rules.");
}

probeProcess.WaitForExit();
string probeStdout = probeStdoutTask.GetAwaiter().GetResult();
string probeStderr = probeStderrTask.GetAwaiter().GetResult();

if (!string.IsNullOrEmpty(probeStdout))
{
session.Log($"tunnel probe stdout: {Redact(probeStdout)}");
}
if (!string.IsNullOrEmpty(probeStderr))
{
session.Log($"tunnel probe stderr: {Redact(probeStderr)}");
}

if (probeProcess.ExitCode != 0)
{
string detail = !string.IsNullOrWhiteSpace(probeStderr) ? Redact(probeStderr).Trim() : $"exit code {probeProcess.ExitCode}";
return Fail($"Agent tunnel connectivity probe failed: {detail}. Verify UDP connectivity to the Gateway QUIC endpoint (typically UDP 4433) and firewall/NAT rules.");
}

session.Log("Agent tunnel enrollment completed successfully");
return ActionResult.Success;
}
Expand Down
Loading