Skip to content

Commit ff6448c

Browse files
chore(telemetry): move telemetry shutdown to worker
1 parent 592c24c commit ff6448c

4 files changed

Lines changed: 34 additions & 24 deletions

File tree

libdd-data-pipeline/src/shared_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ pub struct WorkerHandle {
3535
}
3636

3737
impl WorkerHandle {
38-
/// Stop the worker, call it's shutdown logic and remove it from the worker list.
38+
/// Stop the worker and execute the shutdown logic.
3939
///
4040
/// # Errors
41-
/// Returns an error if the worker does not exist anymore.
41+
/// Returns an error if the worker has already been stopped.
4242
pub async fn stop(self) -> Result<(), SharedRuntimeError> {
4343
let mut workers_lock = self.workers.lock_or_panic();
4444
let Some(position) = workers_lock

libdd-data-pipeline/src/trace_exporter/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,7 @@ impl TraceExporter {
271271
/// This function should not take ownership of the trace exporter as it will cause the runtime
272272
/// stored in the trace exporter to be dropped in a non-blocking context causing a panic.
273273
async fn shutdown_async(&mut self) {
274-
if let StatsComputationStatus::Enabled {
275-
cancellation_token, ..
276-
} = self.client_side_stats.load().as_ref()
277-
{
278-
cancellation_token.cancel();
279-
}
280-
if let Some(telemetry) = self.telemetry.take() {
281-
telemetry.shutdown().await;
282-
}
283-
let _ = self.shared_runtime.shutdown().await;
274+
self.shared_runtime.shutdown().await;
284275
}
285276

286277
/// Check if agent info state has changed
@@ -314,7 +305,13 @@ impl TraceExporter {
314305
StatsComputationStatus::Enabled {
315306
stats_concentrator, ..
316307
} => {
308+
let ctx = stats::StatsContext {
309+
metadata: &self.metadata,
310+
endpoint_url: &self.endpoint.url,
311+
shared_runtime: &self.shared_runtime,
312+
};
317313
stats::handle_stats_enabled(
314+
&ctx,
318315
&agent_info,
319316
stats_concentrator,
320317
&self.client_side_stats,

libdd-data-pipeline/src/trace_exporter/stats.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
//! and processing traces for stats collection.
99
1010
use crate::agent_info::schema::AgentInfo;
11-
use crate::shared_runtime::SharedRuntime;
11+
use crate::shared_runtime::{SharedRuntime, WorkerHandle};
1212
use crate::stats_exporter;
1313
use arc_swap::ArcSwap;
1414
use libdd_common::{Endpoint, HttpClient, MutexExt};
1515
use libdd_trace_stats::span_concentrator::SpanConcentrator;
1616
use std::sync::{Arc, Mutex};
1717
use std::time::Duration;
18-
use tokio_util::sync::CancellationToken;
1918
use tracing::{debug, error};
2019

2120
use super::add_path;
@@ -42,7 +41,7 @@ pub(crate) enum StatsComputationStatus {
4241
/// Client-side stats is enabled
4342
Enabled {
4443
stats_concentrator: Arc<Mutex<SpanConcentrator>>,
45-
cancellation_token: CancellationToken,
44+
worker_handle: WorkerHandle,
4645
},
4746
}
4847

@@ -72,12 +71,10 @@ pub(crate) fn start_stats_computation(
7271
span_kinds,
7372
peer_tags,
7473
)));
75-
let cancellation_token = CancellationToken::new();
7674
create_and_start_stats_worker(
7775
ctx,
7876
bucket_size,
7977
&stats_concentrator,
80-
&cancellation_token,
8178
client_side_stats,
8279
client,
8380
)?;
@@ -90,7 +87,6 @@ fn create_and_start_stats_worker(
9087
ctx: &StatsContext,
9188
bucket_size: Duration,
9289
stats_concentrator: &Arc<Mutex<SpanConcentrator>>,
93-
cancellation_token: &CancellationToken,
9490
client_side_stats: &ArcSwap<StatsComputationStatus>,
9591
client: HttpClient,
9692
) -> anyhow::Result<()> {
@@ -101,14 +97,15 @@ fn create_and_start_stats_worker(
10197
Endpoint::from_url(add_path(ctx.endpoint_url, STATS_ENDPOINT)),
10298
client,
10399
);
104-
ctx.shared_runtime
100+
let worker_handle = ctx
101+
.shared_runtime
105102
.spawn_worker(stats_exporter)
106103
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
107104

108105
// Update the stats computation state with the new worker components.
109106
client_side_stats.store(Arc::new(StatsComputationStatus::Enabled {
110107
stats_concentrator: stats_concentrator.clone(),
111-
cancellation_token: cancellation_token.clone(),
108+
worker_handle,
112109
}));
113110

114111
Ok(())
@@ -117,17 +114,22 @@ fn create_and_start_stats_worker(
117114
/// Stops the stats exporter and disable stats computation
118115
///
119116
/// Used when client-side stats is disabled by the agent
120-
pub(crate) fn stop_stats_computation(client_side_stats: &ArcSwap<StatsComputationStatus>) {
117+
pub(crate) fn stop_stats_computation(
118+
ctx: &StatsContext,
119+
client_side_stats: &ArcSwap<StatsComputationStatus>,
120+
) {
121121
if let StatsComputationStatus::Enabled {
122122
stats_concentrator,
123-
cancellation_token,
123+
worker_handle,
124124
} = &**client_side_stats.load()
125125
{
126-
cancellation_token.cancel();
127126
let bucket_size = stats_concentrator.lock_or_panic().get_bucket_size();
128127
client_side_stats.store(Arc::new(StatsComputationStatus::DisabledByAgent {
129128
bucket_size,
130129
}));
130+
ctx.shared_runtime
131+
.runtime()
132+
.block_on(async { worker_handle.clone().stop().await });
131133
}
132134
}
133135

@@ -158,6 +160,7 @@ pub(crate) fn handle_stats_disabled_by_agent(
158160

159161
/// Handle stats computation when it's already enabled
160162
pub(crate) fn handle_stats_enabled(
163+
ctx: &StatsContext,
161164
agent_info: &Arc<AgentInfo>,
162165
stats_concentrator: &Mutex<SpanConcentrator>,
163166
client_side_stats: &ArcSwap<StatsComputationStatus>,
@@ -167,7 +170,7 @@ pub(crate) fn handle_stats_enabled(
167170
concentrator.set_span_kinds(get_span_kinds_for_stats(agent_info));
168171
concentrator.set_peer_tags(agent_info.info.peer_tags.clone().unwrap_or_default());
169172
} else {
170-
stop_stats_computation(client_side_stats);
173+
stop_stats_computation(ctx, client_side_stats);
171174
debug!("Client-side stats computation has been disabled by the agent")
172175
}
173176
}

libdd-telemetry/src/worker/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,16 @@ impl Worker for TelemetryWorker {
187187

188188
// TODO: Handle action result and add support to stop worker from `run`
189189
}
190+
191+
async fn shutdown(&mut self) {
192+
let stop_action = TelemetryActions::Lifecycle(LifecycleAction::Stop);
193+
let _action_result = match self.flavor {
194+
TelemetryWorkerFlavor::Full => self.dispatch_action(stop_action).await,
195+
TelemetryWorkerFlavor::MetricsLogs => {
196+
self.dispatch_metrics_logs_action(stop_action).await
197+
}
198+
};
199+
}
190200
}
191201

192202
#[derive(Debug, Default, Serialize, Deserialize)]

0 commit comments

Comments
 (0)