Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use libdd_trace_utils::trace_utils;

const DEFAULT_APM_RECEIVER_PORT: u16 = 8126;
const DEFAULT_DOGSTATSD_PORT: u16 = 8125;
const DSM_PIPELINE_STATS_ROUTE: &str = "/api/v0.1/pipeline_stats";

#[derive(Debug)]
pub struct Tags {
Expand Down Expand Up @@ -99,6 +100,8 @@ pub struct Config {
pub trace_flush_interval_secs: u64,
pub trace_intake: Endpoint,
pub trace_stats_intake: Endpoint,
/// Data Streams Monitoring intake endpoint (for proxying pipeline stats to Datadog)
pub dsm_intake: Endpoint,
/// Profiling intake endpoint (for proxying profiling data to Datadog)
pub profiling_intake: Endpoint,
/// Timeout for each proxy request, in seconds
Expand Down Expand Up @@ -180,11 +183,17 @@ impl Config {
let mut trace_intake_url = trace_intake_url(&dd_site);
let mut trace_stats_intake_url = trace_stats_url(&dd_site);

// TODO: Upstream DSM URL in libdatadog and use that instead of constructing here
let mut dsm_intake_url = format!("https://trace.agent.{dd_site}{DSM_PIPELINE_STATS_ROUTE}");

// DD_APM_DD_URL env var will primarily be used for integration tests
// overrides the entire trace/trace stats intake url prefix
if let Ok(endpoint_prefix) = env::var("DD_APM_DD_URL") {
trace_intake_url = trace_intake_url_prefixed(&endpoint_prefix);
trace_stats_intake_url = trace_stats_url_prefixed(&endpoint_prefix);

// TODO: Upstream DSM URL in libdatadog and use that instead of constructing here
dsm_intake_url = format!("{endpoint_prefix}{DSM_PIPELINE_STATS_ROUTE}");
};

// TODO: Create helper functions for this in libdatadog
Expand Down Expand Up @@ -236,6 +245,11 @@ impl Config {
api_key: Some(api_key.clone()),
..Default::default()
},
dsm_intake: Endpoint {
url: hyper::Uri::from_str(&dsm_intake_url).unwrap(),
api_key: Some(api_key.clone()),
..Default::default()
},
profiling_intake: Endpoint {
url: hyper::Uri::from_str(&profiling_intake_url).unwrap(),
api_key: Some(api_key),
Expand Down Expand Up @@ -310,6 +324,10 @@ mod tests {
config.trace_stats_intake.url,
"https://trace.agent.datadoghq.com/api/v0.2/stats"
);
assert_eq!(
config.dsm_intake.url,
"https://trace.agent.datadoghq.com/api/v0.1/pipeline_stats"
);
},
);
}
Expand Down Expand Up @@ -392,6 +410,38 @@ mod tests {
config.trace_stats_intake.url,
"http://127.0.0.1:3333/api/v0.2/stats"
);
assert_eq!(
config.dsm_intake.url,
"http://127.0.0.1:3333/api/v0.1/pipeline_stats"
);
},
);
}

#[duplicate_item(
test_name dd_site expected_url;
[test_us1_dsm_intake_url] ["datadoghq.com"] ["https://trace.agent.datadoghq.com/api/v0.1/pipeline_stats"];
[test_us3_dsm_intake_url] ["us3.datadoghq.com"] ["https://trace.agent.us3.datadoghq.com/api/v0.1/pipeline_stats"];
[test_us5_dsm_intake_url] ["us5.datadoghq.com"] ["https://trace.agent.us5.datadoghq.com/api/v0.1/pipeline_stats"];
[test_eu_dsm_intake_url] ["datadoghq.eu"] ["https://trace.agent.datadoghq.eu/api/v0.1/pipeline_stats"];
[test_ap1_dsm_intake_url] ["ap1.datadoghq.com"] ["https://trace.agent.ap1.datadoghq.com/api/v0.1/pipeline_stats"];
[test_gov_dsm_intake_url] ["ddog-gov.com"] ["https://trace.agent.ddog-gov.com/api/v0.1/pipeline_stats"];
)]
#[test]
#[serial]
fn test_name() {
temp_env::with_vars(
[
("DD_API_KEY", Some("_not_a_real_key_")),
("K_SERVICE", Some("function_name")),
("FUNCTION_TARGET", Some("function_target")),
("DD_SITE", Some(dd_site)),
],
|| {
let config_res = config::Config::new();
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dsm_intake.url, expected_url);
},
);
}
Expand Down Expand Up @@ -723,6 +773,7 @@ pub mod test_helpers {
trace_flush_interval_secs: 5,
trace_intake: Endpoint::default(),
trace_stats_intake: Endpoint::default(),
dsm_intake: Endpoint::default(),
profiling_intake: Endpoint::default(),
proxy_request_timeout_secs: 30,
proxy_request_max_retries: 3,
Expand Down
53 changes: 39 additions & 14 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::warn;
use tracing::{debug, error};

use crate::http_utils::{log_and_create_http_response, verify_request_content_length};
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest};
use crate::proxy_flusher::{ProxyFlusher, ProxyRequest, ProxyRequestKind};

#[cfg(all(windows, feature = "windows-pipes"))]
use tokio::net::windows::named_pipe::ServerOptions;
Expand All @@ -31,6 +31,7 @@ use libdd_trace_utils::trace_utils::SendData;
const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
const INFO_ENDPOINT_PATH: &str = "/info";
const DSM_ENDPOINT_PATH: &str = "/v0.1/pipeline_stats";
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
Expand Down Expand Up @@ -107,11 +108,12 @@ impl MiniAgent {
.await;
});

// channels to send processed profiling requests to our proxy flusher
// channel used to forward proxied payloads (for example profiling and
// Data Streams Monitoring)
let (proxy_tx, proxy_rx): (Sender<ProxyRequest>, Receiver<ProxyRequest>) =
mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE);

// start our proxy flusher for profiling requests
// start our proxy flusher for proxied requests
let proxy_flusher = self.proxy_flusher.clone();
tokio::spawn(async move {
proxy_flusher.start_proxy_flusher(proxy_rx).await;
Expand Down Expand Up @@ -626,11 +628,29 @@ impl MiniAgent {
),
}
}
(&Method::POST, DSM_ENDPOINT_PATH) => {
match Self::proxy_handler(config, req, proxy_tx, ProxyRequestKind::DataStreams)
.await
{
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!(
"Error processing {}: {err}",
ProxyRequestKind::DataStreams.request_name()
),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(&Method::POST, PROFILING_ENDPOINT_PATH) => {
match Self::profiling_proxy_handler(config, req, proxy_tx).await {
match Self::proxy_handler(config, req, proxy_tx, ProxyRequestKind::Profiling).await
{
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!("Error processing profiling request: {err}"),
&format!(
"Error processing {}: {err}",
ProxyRequestKind::Profiling.request_name()
),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
Expand Down Expand Up @@ -659,20 +679,23 @@ impl MiniAgent {
}
}

/// Handles incoming proxy requests for profiling - can be abstracted into a generic proxy handler for other proxy requests in the future
async fn profiling_proxy_handler(
/// Handles incoming proxy requests such as profiling or Data Streams
/// Monitoring payloads.
async fn proxy_handler(
config: Arc<config::Config>,
request: http_common::HttpRequest,
proxy_tx: Sender<ProxyRequest>,
proxy_kind: ProxyRequestKind,
) -> http::Result<http_common::HttpResponse> {
debug!("Received profiling request");
let request_name = proxy_kind.request_name();
debug!("Received {request_name}");

// Extract headers and body
let (parts, body) = request.into_parts();
if let Some(response) = verify_request_content_length(
&parts.headers,
config.max_request_content_length,
"Error processing profiling request",
&format!("Error processing {request_name}"),
) {
return response;
}
Expand All @@ -681,7 +704,7 @@ impl MiniAgent {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return log_and_create_http_response(
&format!("Error reading profiling request body: {e}"),
&format!("Error reading {request_name} body: {e}"),
StatusCode::BAD_REQUEST,
);
}
Expand All @@ -691,22 +714,23 @@ impl MiniAgent {
let proxy_request = ProxyRequest {
headers: parts.headers,
body: body_bytes,
target_url: config.profiling_intake.url.to_string(),
target_url: proxy_kind.intake_url(config.as_ref()),
kind: proxy_kind,
};

debug!(
"Sending profiling request to channel, target: {}",
"Sending {request_name} to channel, target: {}",
proxy_request.target_url
);

// Send to channel
match proxy_tx.send(proxy_request).await {
Ok(_) => log_and_create_http_response(
"Successfully buffered profiling request to be flushed",
&format!("Successfully buffered {request_name} to be flushed"),
StatusCode::OK,
),
Err(err) => log_and_create_http_response(
&format!("Error sending profiling request to the proxy flusher: {err}"),
&format!("Error sending {request_name} to the proxy flusher: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
Expand Down Expand Up @@ -739,6 +763,7 @@ impl MiniAgent {
TRACE_ENDPOINT_PATH,
STATS_ENDPOINT_PATH,
INFO_ENDPOINT_PATH,
DSM_ENDPOINT_PATH,
PROFILING_ENDPOINT_PATH
],
"client_drop_p0s": client_drop_p0s,
Expand Down
Loading
Loading