diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index 0096df803..1d4b4ae4d 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::OnceCell; +use crate::FLUSH_RETRY_COUNT; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::http_client::HttpClient; @@ -93,32 +94,36 @@ impl StatsFlusher { let stats_url = trace_stats_url(&self.config.site); - let start = std::time::Instant::now(); - - let resp = stats_utils::send_stats_payload_with_client( - serialized_stats_payload, - endpoint, - api_key.as_str(), - Some(&self.http_client), - ) - .await; - let elapsed = start.elapsed(); - debug!( - "STATS | Stats request to {} took {} ms", - stats_url, - elapsed.as_millis() - ); - match resp { - Ok(()) => { - debug!("STATS | Successfully flushed stats"); - None - } - Err(e) => { - // Network/server errors are temporary - return stats for retry - error!("STATS | Error sending stats: {e:?}"); - Some(stats) + for attempt in 1..=FLUSH_RETRY_COUNT { + let start = std::time::Instant::now(); + let resp = stats_utils::send_stats_payload_with_client( + serialized_stats_payload.clone(), + endpoint, + api_key.as_str(), + Some(&self.http_client), + ) + .await; + let elapsed = start.elapsed(); + + match resp { + Ok(()) => { + debug!( + "STATS | Successfully flushed stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT})", + elapsed.as_millis() + ); + return None; + } + Err(e) => { + debug!( + "STATS | Failed to send stats to {stats_url} in {} ms (attempt {attempt}/{FLUSH_RETRY_COUNT}): {e:?}", + elapsed.as_millis() + ); + } } } + + error!("STATS | Exhausted all {FLUSH_RETRY_COUNT} attempts, returning stats for redrive"); + Some(stats) } /// Flushes stats from the aggregator. diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 7e43eab23..89f40050e 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -6,6 +6,7 @@ use libdd_common::Endpoint; use libdd_trace_utils::{ config_utils::trace_intake_url_prefixed, send_data::SendData, + send_with_retry::{RetryBackoffType, RetryStrategy}, trace_utils::{self}, tracer_payload::TracerPayloadCollection, }; @@ -14,11 +15,25 @@ use std::sync::Arc; use tokio::task::JoinSet; use tracing::{debug, error}; +use crate::FLUSH_RETRY_COUNT; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::traces::http_client::HttpClient; use crate::traces::trace_aggregator_service::AggregatorHandle; +/// Retry strategy for trace flushing using the shared `FLUSH_RETRY_COUNT` +/// with no delay between attempts. In Lambda, every millisecond of wall-clock +/// time matters, and the per-attempt request timeout already bounds how long +/// each retry can take. +fn trace_retry_strategy() -> RetryStrategy { + RetryStrategy::new( + u32::try_from(FLUSH_RETRY_COUNT).unwrap_or(3), + 0, + RetryBackoffType::Constant, + None, + ) +} + pub struct TraceFlusher { pub aggregator_handle: AggregatorHandle, pub config: Arc, @@ -113,7 +128,11 @@ impl TraceFlusher { let traces_with_tags: Vec<_> = trace_builders .into_iter() .map(|info| { - let trace = info.builder.with_api_key(api_key.as_str()).build(); + let trace = info + .builder + .with_api_key(api_key.as_str()) + .with_retry_strategy(trace_retry_strategy()) + .build(); (trace, info.header_tags) }) .collect(); @@ -125,12 +144,16 @@ impl TraceFlusher { let additional_traces: Vec<_> = traces_with_tags .iter() .filter_map(|(trace, tags)| match trace.get_payloads() { - TracerPayloadCollection::V07(payloads) => Some(SendData::new( - trace.len(), - TracerPayloadCollection::V07(payloads.clone()), - tags.to_tracer_header_tags(), - &endpoint, - )), + TracerPayloadCollection::V07(payloads) => { + let mut send_data = SendData::new( + trace.len(), + TracerPayloadCollection::V07(payloads.clone()), + tags.to_tracer_header_tags(), + &endpoint, + ); + send_data.set_retry_strategy(trace_retry_strategy()); + Some(send_data) + } // All payloads in the extension are V07 (produced by // collect_pb_trace_chunks), so this branch is unreachable. _ => None, @@ -174,12 +197,23 @@ impl TraceFlusher { debug!("TRACES | Flushing {} traces", coalesced_traces.len()); for trace in &coalesced_traces { - let send_result = trace.send(&http_client).await.last_result; - - if let Err(e) = send_result { - error!("TRACES | Request failed: {e:?}"); + let result = trace.send(&http_client).await; + + if let Err(e) = &result.last_result { + error!( + "TRACES | Request failed after {} attempts ({} timeouts, {} network errors, {} status code errors): {e:?}", + result.requests_count, + result.errors_timeout, + result.errors_network, + result.errors_status_code, + ); return Some(coalesced_traces); } + + debug!( + "TRACES | Successfully sent trace ({} attempts, {} bytes)", + result.requests_count, result.bytes_sent, + ); } debug!("TRACES | Flushing took {} ms", start.elapsed().as_millis());