diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index e23e2a6..5be9fbe 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -18,6 +18,7 @@ use libdd_trace_utils::trace_utils; const DEFAULT_APM_RECEIVER_PORT: u16 = 8126; const DEFAULT_DOGSTATSD_PORT: u16 = 8125; +const DSM_PIPELINE_STATS_ROUTE: &str = "/api/v0.1/pipeline_stats"; #[derive(Debug)] pub struct Tags { @@ -99,6 +100,8 @@ pub struct Config { pub trace_flush_interval_secs: u64, pub trace_intake: Endpoint, pub trace_stats_intake: Endpoint, + /// Data Streams Monitoring intake endpoint (for proxying pipeline stats to Datadog) + pub dsm_intake: Endpoint, /// Profiling intake endpoint (for proxying profiling data to Datadog) pub profiling_intake: Endpoint, /// Timeout for each proxy request, in seconds @@ -180,11 +183,17 @@ impl Config { let mut trace_intake_url = trace_intake_url(&dd_site); let mut trace_stats_intake_url = trace_stats_url(&dd_site); + // TODO: Upstream DSM URL in libdatadog and use that instead of constructing here + let mut dsm_intake_url = format!("https://trace.agent.{dd_site}{DSM_PIPELINE_STATS_ROUTE}"); + // DD_APM_DD_URL env var will primarily be used for integration tests // overrides the entire trace/trace stats intake url prefix if let Ok(endpoint_prefix) = env::var("DD_APM_DD_URL") { trace_intake_url = trace_intake_url_prefixed(&endpoint_prefix); trace_stats_intake_url = trace_stats_url_prefixed(&endpoint_prefix); + + // TODO: Upstream DSM URL in libdatadog and use that instead of constructing here + dsm_intake_url = format!("{endpoint_prefix}{DSM_PIPELINE_STATS_ROUTE}"); }; // TODO: Create helper functions for this in libdatadog @@ -236,6 +245,11 @@ impl Config { api_key: Some(api_key.clone()), ..Default::default() }, + dsm_intake: Endpoint { + url: hyper::Uri::from_str(&dsm_intake_url).unwrap(), + api_key: Some(api_key.clone()), + ..Default::default() + }, profiling_intake: Endpoint { url: hyper::Uri::from_str(&profiling_intake_url).unwrap(), api_key: Some(api_key), @@ -310,6 +324,10 @@ mod tests { config.trace_stats_intake.url, "https://trace.agent.datadoghq.com/api/v0.2/stats" ); + assert_eq!( + config.dsm_intake.url, + "https://trace.agent.datadoghq.com/api/v0.1/pipeline_stats" + ); }, ); } @@ -392,6 +410,38 @@ mod tests { config.trace_stats_intake.url, "http://127.0.0.1:3333/api/v0.2/stats" ); + assert_eq!( + config.dsm_intake.url, + "http://127.0.0.1:3333/api/v0.1/pipeline_stats" + ); + }, + ); + } + + #[duplicate_item( + test_name dd_site expected_url; + [test_us1_dsm_intake_url] ["datadoghq.com"] ["https://trace.agent.datadoghq.com/api/v0.1/pipeline_stats"]; + [test_us3_dsm_intake_url] ["us3.datadoghq.com"] ["https://trace.agent.us3.datadoghq.com/api/v0.1/pipeline_stats"]; + [test_us5_dsm_intake_url] ["us5.datadoghq.com"] ["https://trace.agent.us5.datadoghq.com/api/v0.1/pipeline_stats"]; + [test_eu_dsm_intake_url] ["datadoghq.eu"] ["https://trace.agent.datadoghq.eu/api/v0.1/pipeline_stats"]; + [test_ap1_dsm_intake_url] ["ap1.datadoghq.com"] ["https://trace.agent.ap1.datadoghq.com/api/v0.1/pipeline_stats"]; + [test_gov_dsm_intake_url] ["ddog-gov.com"] ["https://trace.agent.ddog-gov.com/api/v0.1/pipeline_stats"]; + )] + #[test] + #[serial] + fn test_name() { + temp_env::with_vars( + [ + ("DD_API_KEY", Some("_not_a_real_key_")), + ("K_SERVICE", Some("function_name")), + ("FUNCTION_TARGET", Some("function_target")), + ("DD_SITE", Some(dd_site)), + ], + || { + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dsm_intake.url, expected_url); }, ); } @@ -723,6 +773,7 @@ pub mod test_helpers { trace_flush_interval_secs: 5, trace_intake: Endpoint::default(), trace_stats_intake: Endpoint::default(), + dsm_intake: Endpoint::default(), profiling_intake: Endpoint::default(), proxy_request_timeout_secs: 30, proxy_request_max_retries: 3, diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index 9d09c79..19c6df0 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -17,7 +17,7 @@ use tracing::warn; use tracing::{debug, error}; use crate::http_utils::{log_and_create_http_response, verify_request_content_length}; -use crate::proxy_flusher::{ProxyFlusher, ProxyRequest}; +use crate::proxy_flusher::{ProxyFlusher, ProxyRequest, ProxyRequestKind}; #[cfg(all(windows, feature = "windows-pipes"))] use tokio::net::windows::named_pipe::ServerOptions; @@ -31,6 +31,7 @@ use libdd_trace_utils::trace_utils::SendData; const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces"; const STATS_ENDPOINT_PATH: &str = "/v0.6/stats"; const INFO_ENDPOINT_PATH: &str = "/info"; +const DSM_ENDPOINT_PATH: &str = "/v0.1/pipeline_stats"; const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input"; const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10; @@ -107,11 +108,12 @@ impl MiniAgent { .await; }); - // channels to send processed profiling requests to our proxy flusher + // channel used to forward proxied payloads (for example profiling and + // Data Streams Monitoring) let (proxy_tx, proxy_rx): (Sender, Receiver) = mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE); - // start our proxy flusher for profiling requests + // start our proxy flusher for proxied requests let proxy_flusher = self.proxy_flusher.clone(); tokio::spawn(async move { proxy_flusher.start_proxy_flusher(proxy_rx).await; @@ -626,11 +628,29 @@ impl MiniAgent { ), } } + (&Method::POST, DSM_ENDPOINT_PATH) => { + match Self::proxy_handler(config, req, proxy_tx, ProxyRequestKind::DataStreams) + .await + { + Ok(res) => Ok(res), + Err(err) => log_and_create_http_response( + &format!( + "Error processing {}: {err}", + ProxyRequestKind::DataStreams.request_name() + ), + StatusCode::INTERNAL_SERVER_ERROR, + ), + } + } (&Method::POST, PROFILING_ENDPOINT_PATH) => { - match Self::profiling_proxy_handler(config, req, proxy_tx).await { + match Self::proxy_handler(config, req, proxy_tx, ProxyRequestKind::Profiling).await + { Ok(res) => Ok(res), Err(err) => log_and_create_http_response( - &format!("Error processing profiling request: {err}"), + &format!( + "Error processing {}: {err}", + ProxyRequestKind::Profiling.request_name() + ), StatusCode::INTERNAL_SERVER_ERROR, ), } @@ -659,20 +679,23 @@ impl MiniAgent { } } - /// Handles incoming proxy requests for profiling - can be abstracted into a generic proxy handler for other proxy requests in the future - async fn profiling_proxy_handler( + /// Handles incoming proxy requests such as profiling or Data Streams + /// Monitoring payloads. + async fn proxy_handler( config: Arc, request: http_common::HttpRequest, proxy_tx: Sender, + proxy_kind: ProxyRequestKind, ) -> http::Result { - debug!("Received profiling request"); + let request_name = proxy_kind.request_name(); + debug!("Received {request_name}"); // Extract headers and body let (parts, body) = request.into_parts(); if let Some(response) = verify_request_content_length( &parts.headers, config.max_request_content_length, - "Error processing profiling request", + &format!("Error processing {request_name}"), ) { return response; } @@ -681,7 +704,7 @@ impl MiniAgent { Ok(collected) => collected.to_bytes(), Err(e) => { return log_and_create_http_response( - &format!("Error reading profiling request body: {e}"), + &format!("Error reading {request_name} body: {e}"), StatusCode::BAD_REQUEST, ); } @@ -691,22 +714,23 @@ impl MiniAgent { let proxy_request = ProxyRequest { headers: parts.headers, body: body_bytes, - target_url: config.profiling_intake.url.to_string(), + target_url: proxy_kind.intake_url(config.as_ref()), + kind: proxy_kind, }; debug!( - "Sending profiling request to channel, target: {}", + "Sending {request_name} to channel, target: {}", proxy_request.target_url ); // Send to channel match proxy_tx.send(proxy_request).await { Ok(_) => log_and_create_http_response( - "Successfully buffered profiling request to be flushed", + &format!("Successfully buffered {request_name} to be flushed"), StatusCode::OK, ), Err(err) => log_and_create_http_response( - &format!("Error sending profiling request to the proxy flusher: {err}"), + &format!("Error sending {request_name} to the proxy flusher: {err}"), StatusCode::INTERNAL_SERVER_ERROR, ), } @@ -739,6 +763,7 @@ impl MiniAgent { TRACE_ENDPOINT_PATH, STATS_ENDPOINT_PATH, INFO_ENDPOINT_PATH, + DSM_ENDPOINT_PATH, PROFILING_ENDPOINT_PATH ], "client_drop_p0s": client_drop_p0s, diff --git a/crates/datadog-trace-agent/src/proxy_flusher.rs b/crates/datadog-trace-agent/src/proxy_flusher.rs index d78a7ea..14a5b49 100644 --- a/crates/datadog-trace-agent/src/proxy_flusher.rs +++ b/crates/datadog-trace-agent/src/proxy_flusher.rs @@ -28,6 +28,36 @@ pub struct ProxyRequest { pub headers: HeaderMap, pub body: Bytes, pub target_url: String, + pub kind: ProxyRequestKind, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ProxyRequestKind { + DataStreams, + Profiling, +} + +impl ProxyRequestKind { + pub fn name(self) -> &'static str { + match self { + ProxyRequestKind::DataStreams => "data streams", + ProxyRequestKind::Profiling => "profiling", + } + } + + pub fn request_name(self) -> &'static str { + match self { + ProxyRequestKind::DataStreams => "data streams request", + ProxyRequestKind::Profiling => "profiling request", + } + } + + pub fn intake_url(self, config: &Config) -> String { + match self { + ProxyRequestKind::DataStreams => config.dsm_intake.url.to_string(), + ProxyRequestKind::Profiling => config.profiling_intake.url.to_string(), + } + } } pub struct ProxyFlusher { @@ -37,10 +67,7 @@ pub struct ProxyFlusher { impl ProxyFlusher { pub fn new(config: Arc) -> Self { - debug!( - "Creating new proxy flusher with target URL: {}", - config.profiling_intake.url - ); + debug!("Creating new proxy flusher"); let client = build_client( config.proxy_url.as_deref(), Duration::from_secs(config.proxy_request_timeout_secs), @@ -57,11 +84,6 @@ impl ProxyFlusher { /// Starts the proxy flusher that listens for proxy payloads from the channel and forwards them to Datadog pub async fn start_proxy_flusher(&self, mut rx: Receiver) { - let Some(api_key) = self.config.profiling_intake.api_key.as_ref() else { - error!("No API key configured, cannot start"); - return; - }; - debug!("Started, listening for requests"); while let Some(proxy_payload) = rx.recv().await { @@ -69,43 +91,93 @@ impl ProxyFlusher { "Received request from channel, body size: {} bytes", proxy_payload.body.len() ); - self.send_request(proxy_payload, api_key).await; + self.send_request(proxy_payload).await; + } + } + + fn api_key_for_kind(&self, kind: ProxyRequestKind) -> Option<&str> { + match kind { + ProxyRequestKind::DataStreams => self.config.dsm_intake.api_key.as_deref(), + ProxyRequestKind::Profiling => self.config.profiling_intake.api_key.as_deref(), + } + } + + fn parse_additional_tags(tag_string: &str) -> impl Iterator { + tag_string + .split([',', ';']) + .map(str::trim) + .filter(|tag| !tag.is_empty()) + } + + fn push_unique_tag(tags: &mut Vec, tag: impl Into) { + let tag = tag.into(); + if !tags.iter().any(|existing| existing == &tag) { + tags.push(tag); } } + fn merged_additional_tags(&self, headers: &HeaderMap) -> Result, String> { + let mut tags = Vec::new(); + + if let Some(existing_tags) = headers.get(DD_ADDITIONAL_TAGS_HEADER) { + let existing_tags = existing_tags + .to_str() + .map_err(|e| format!("Failed to parse existing additional tags header: {e}"))?; + for tag in Self::parse_additional_tags(existing_tags) { + Self::push_unique_tag(&mut tags, tag.to_string()); + } + } + + if let Some(function_tags) = self.config.tags.function_tags() { + for tag in Self::parse_additional_tags(function_tags) { + Self::push_unique_tag(&mut tags, tag.to_string()); + } + } + + Ok(tags) + } + async fn create_request( &self, request: &ProxyRequest, - api_key: &str, ) -> Result { let mut headers = request.headers.clone(); + let api_key = self + .api_key_for_kind(request.kind) + .ok_or_else(|| format!("No API key configured for {}", request.kind.name()))?; // Remove headers that are not needed for the proxy request headers.remove("host"); headers.remove("content-length"); - // Add headers to the request - let mut tag_parts = vec![]; - - // Add serverless-specific tags for profiling - tag_parts.push(format!( - "functionname:{}", - self.config.app_name.as_deref().unwrap_or_default() - )); - tag_parts.push(format!( - "_dd.origin:{}", - get_dd_origin(&self.config.env_type) - )); - - let additional_tags = tag_parts.join(","); - match additional_tags.parse() { - Ok(parsed_tags) => { - headers.insert(DD_ADDITIONAL_TAGS_HEADER, parsed_tags); - } - Err(e) => { - return Err(format!("Failed to parse additional tags header: {}", e)); - } - }; + // Preserve any incoming additional tags and augment them with the + // function tags configured for the serverless app. + let mut additional_tags = self.merged_additional_tags(&headers)?; + + // Add the serverless-specific tags that power billing and product + // attribution for proxied requests. + Self::push_unique_tag( + &mut additional_tags, + format!( + "functionname:{}", + self.config.app_name.as_deref().unwrap_or_default() + ), + ); + Self::push_unique_tag( + &mut additional_tags, + format!("_dd.origin:{}", get_dd_origin(&self.config.env_type)), + ); + + if !additional_tags.is_empty() { + match additional_tags.join(",").parse() { + Ok(parsed_tags) => { + headers.insert(DD_ADDITIONAL_TAGS_HEADER, parsed_tags); + } + Err(e) => { + return Err(format!("Failed to parse additional tags header: {}", e)); + } + }; + } match api_key.parse() { Ok(parsed_key) => headers.insert("DD-API-KEY", parsed_key), @@ -122,14 +194,14 @@ impl ProxyFlusher { .body(request.body.clone())) } - async fn send_request(&self, request: ProxyRequest, api_key: &str) { + async fn send_request(&self, request: ProxyRequest) { let max_retries = self.config.proxy_request_max_retries; let mut attempts = 0; loop { attempts += 1; - let request_builder = match self.create_request(&request, api_key).await { + let request_builder = match self.create_request(&request).await { Ok(builder) => builder, Err(e) => { error!("{}", e); @@ -147,14 +219,16 @@ impl ProxyFlusher { Ok(r) => { let url = r.url().to_string(); let status = r.status(); - let body = r.text().await; - if status == 202 { + if status.is_success() { debug!( "Successfully sent request in {} ms to {url}", elapsed.as_millis() ); } else { - error!("Request failed with status {status}: {body:?}"); + error!( + "Request failed with status {status} after {} ms to {url}; response body omitted", + elapsed.as_millis() + ); } return; } diff --git a/crates/datadog-trace-agent/src/trace_processor.rs b/crates/datadog-trace-agent/src/trace_processor.rs index 0d8fc19..041153a 100644 --- a/crates/datadog-trace-agent/src/trace_processor.rs +++ b/crates/datadog-trace-agent/src/trace_processor.rs @@ -259,6 +259,13 @@ mod tests { api_key: Some("dummy_api_key".into()), ..Default::default() }, + dsm_intake: Endpoint { + url: hyper::Uri::from_static( + "https://trace.agent.notdog.com/api/v0.1/pipeline_stats", + ), + api_key: Some("dummy_api_key".into()), + ..Default::default() + }, dd_site: "datadoghq.com".to_string(), dd_apm_receiver_port: 8126, #[cfg(any(all(windows, feature = "windows-pipes"), test))] diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index d73f8a5..f74f9d6 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -10,7 +10,7 @@ use common::helpers::{ use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ - config::{Config, test_helpers::create_tcp_test_config}, + config::{Config, Tags, test_helpers::create_tcp_test_config}, mini_agent::MiniAgent, peer_tags::peer_tag_keys, proxy_flusher::ProxyFlusher, @@ -53,6 +53,8 @@ async fn wait_for_request_at_path(mock_server: &common::mock_server::MockServer, pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { let trace_url = format!("{}/api/v0.2/traces", mock_server_url); let stats_url = format!("{}/api/v0.2/stats", mock_server_url); + let dsm_url = format!("{}/api/v0.1/pipeline_stats", mock_server_url); + let profiling_url = format!("{}/api/v2/profile", mock_server_url); config.trace_intake = libdd_common::Endpoint { url: trace_url.parse().unwrap(), @@ -64,6 +66,16 @@ pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { api_key: Some("test-api-key".into()), ..Default::default() }; + config.dsm_intake = libdd_common::Endpoint { + url: dsm_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.profiling_intake = libdd_common::Endpoint { + url: profiling_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; config.trace_flush_interval_secs = 1; config.stats_flush_interval_secs = 1; } @@ -185,6 +197,49 @@ pub fn verify_no_stats_request(mock_server: &common::mock_server::MockServer) { ); } +/// Helper to verify a DSM request sent to the mock server +pub async fn verify_dsm_request( + mock_server: &common::mock_server::MockServer, + expected_body: &[u8], + expected_additional_tags: &[&str], +) { + wait_for_request_at_path(mock_server, "/api/v0.1/pipeline_stats").await; + let dsm_reqs = mock_server.get_requests_for_path("/api/v0.1/pipeline_stats"); + + assert!( + !dsm_reqs.is_empty(), + "Expected at least one DSM request to mock server" + ); + + let dsm_req = &dsm_reqs[0]; + assert_eq!(dsm_req.method, "POST", "Expected POST method"); + assert_eq!( + dsm_req.body, expected_body, + "Expected DSM payload body to be forwarded unchanged" + ); + + let api_key = dsm_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + let additional_tags = dsm_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "x-datadog-additional-tags"); + let additional_tags = additional_tags + .map(|(_, v)| v.as_str()) + .expect("Expected DSM requests to include additional tags"); + for expected_tag in expected_additional_tags { + assert!( + additional_tags.contains(expected_tag), + "Expected DSM additional tags to contain {expected_tag:?}, got: {additional_tags:?}" + ); + } +} + #[cfg(test)] #[tokio::test] #[serial] @@ -239,6 +294,7 @@ async fn test_mini_agent_tcp_handles_requests() { "/v0.4/traces", "/v0.6/stats", "/info", + "/v0.1/pipeline_stats", "/profiling/v1/input" ]), "Expected endpoints array" @@ -354,6 +410,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { "/v0.4/traces", "/v0.6/stats", "/info", + "/v0.1/pipeline_stats", "/profiling/v1/input" ]), "Expected endpoints array" @@ -466,6 +523,86 @@ async fn test_mini_agent_tcp_with_real_flushers() { verify_stats_request(&mock_server).await; // Stats generator should generate stats from trace payload } +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_tcp_proxies_dsm_requests() { + let mock_server: MockServer = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let mut config = create_tcp_test_config(8133); + configure_mock_endpoints(&mut config, &mock_server.url()); + config.tags = Tags::from_env_string("env:test,service:payments"); + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor { + stats_concentrator: None, + }), + trace_flusher: Arc::new(MockTraceFlusher), + stats_processor: Arc::new(MockStatsProcessor), + stats_flusher: Arc::new(MockStatsFlusher), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config)), + }; + + let agent_handle = tokio::spawn(async move { + let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + let _ = mini_agent.start_mini_agent(shutdown_rx, None).await; + }); + + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None, &[]).await + && response.status().is_success() + { + server_ready = true; + break; + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); + + let dsm_payload = br#"{"Stats":[{"EdgeTags":["direction:out","type:kafka"]}]}"#.to_vec(); + let response = send_tcp_request( + test_port, + "/v0.1/pipeline_stats", + "POST", + Some(dsm_payload.clone()), + &[ + ("X-Datadog-Test-Header", "dsm"), + ( + "X-Datadog-Additional-Tags", + "host:worker-1,default_env:prod", + ), + ], + ) + .await + .expect("Failed to send /v0.1/pipeline_stats request"); + assert_eq!(response.status(), StatusCode::OK); + + verify_dsm_request( + &mock_server, + &dsm_payload, + &[ + "host:worker-1", + "default_env:prod", + "env:test", + "service:payments", + "functionname:test-app", + "_dd.origin:azurefunction", + ], + ) + .await; + + agent_handle.abort(); +} + #[cfg(test)] #[tokio::test] #[serial]