From 23f6e9d5378204441ee371357b8e2af0767f39e3 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Fri, 16 Jan 2026 17:11:06 +0100 Subject: [PATCH 1/2] feat(memtrack): require minimum protocol version for memtrack --- src/executor/memory/executor.rs | 24 ++++++++-------- src/executor/shared/fifo.rs | 45 ++++++++++++++++++------------ src/executor/wall_time/perf/mod.rs | 14 +++++----- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/src/executor/memory/executor.rs b/src/executor/memory/executor.rs index cd41241f..a7b98efc 100644 --- a/src/executor/memory/executor.rs +++ b/src/executor/memory/executor.rs @@ -154,14 +154,19 @@ impl MemoryExecutor { let on_cmd = async move |cmd: &FifoCommand| { match cmd { - FifoCommand::CurrentBenchmark { pid, uri } => { - debug!("Current benchmark: {pid}, {uri}"); + FifoCommand::SetVersion(protocol_version) => { + if *protocol_version < 2 { + bail!( + "Memory profiling requires protocol version 2 or higher, but the integration is using version {protocol_version}. \ + This integration doesn't support memory profiling. Please update your integration to a version that supports memory profiling.", + ); + } } FifoCommand::StartBenchmark => { debug!("Enabling memtrack via IPC"); if let Err(e) = ipc_client.enable() { error!("Failed to enable memtrack: {e}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } } FifoCommand::StopBenchmark => { @@ -169,21 +174,18 @@ impl MemoryExecutor { if let Err(e) = ipc_client.disable() { // There's a chance that memtrack has already exited here, so just log as debug debug!("Failed to disable memtrack: {e}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } } FifoCommand::GetIntegrationMode => { - return Ok(FifoCommand::IntegrationModeResponse( + return Ok(Some(FifoCommand::IntegrationModeResponse( IntegrationMode::Analysis, - )); - } - _ => { - warn!("Unhandled FIFO command: {cmd:?}"); - return Ok(FifoCommand::Err); + ))); } + _ => {} } - Ok(FifoCommand::Ack) + Ok(None) }; let (marker_result, _) = runner_fifo diff --git a/src/executor/shared/fifo.rs b/src/executor/shared/fifo.rs index 60b5825b..dc559f67 100644 --- a/src/executor/shared/fifo.rs +++ b/src/executor/shared/fifo.rs @@ -123,10 +123,14 @@ impl RunnerFifo { /// Handles all incoming FIFO messages until it's closed, or until the health check closure /// returns `false` or an error. + /// + /// The `handle_cmd` callback is invoked first for each command. If it returns `Some(response)`, + /// that response is sent and the shared implementation is skipped. If it returns `None`, + /// the command falls through to the shared implementation for standard handling. pub async fn handle_fifo_messages( &mut self, mut health_check: impl AsyncFnMut() -> anyhow::Result, - mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result, + mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result>, ) -> anyhow::Result<(ExecutionTimestamps, FifoBenchmarkData)> { let mut bench_order_by_timestamp = Vec::<(u64, String)>::new(); let mut bench_pids = HashSet::::new(); @@ -158,38 +162,44 @@ impl RunnerFifo { }; trace!("Received command: {cmd:?}"); + // Try executor-specific handler first + if let Some(ack) = handle_cmd(&cmd).await? { + self.send_cmd(ack).await?; + continue; + } + + // Fall through to shared implementation for standard commands match &cmd { FifoCommand::CurrentBenchmark { pid, uri } => { bench_order_by_timestamp.push((current_time(), uri.to_string())); bench_pids.insert(*pid); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::StartBenchmark => { - if benchmark_started { + if !benchmark_started { + benchmark_started = true; + markers.push(MarkerType::SampleStart(current_time())); + } else { warn!("Received duplicate StartBenchmark command, ignoring"); - self.send_cmd(FifoCommand::Ack).await?; - continue; } - benchmark_started = true; - markers.push(MarkerType::SampleStart(current_time())); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::StopBenchmark => { - if !benchmark_started { + if benchmark_started { + benchmark_started = false; + markers.push(MarkerType::SampleEnd(current_time())); + } else { warn!("Received StopBenchmark command before StartBenchmark, ignoring"); - self.send_cmd(FifoCommand::Ack).await?; - continue; } - benchmark_started = false; - markers.push(MarkerType::SampleEnd(current_time())); + self.send_cmd(FifoCommand::Ack).await?; } FifoCommand::SetIntegration { name, version } => { integration = Some((name.into(), version.into())); self.send_cmd(FifoCommand::Ack).await?; - continue; } FifoCommand::AddMarker { marker, .. } => { markers.push(*marker); self.send_cmd(FifoCommand::Ack).await?; - continue; } FifoCommand::SetVersion(protocol_version) => { match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) { @@ -204,7 +214,6 @@ impl RunnerFifo { ); } self.send_cmd(FifoCommand::Ack).await?; - continue; } Ordering::Greater => bail!( "Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.", @@ -212,14 +221,14 @@ impl RunnerFifo { ), Ordering::Equal => { self.send_cmd(FifoCommand::Ack).await?; - continue; } } } - _ => {} + _ => { + warn!("Unhandled FIFO command: {cmd:?}"); + self.send_cmd(FifoCommand::Err).await?; + } } - - self.send_cmd(handle_cmd(&cmd).await?).await?; } let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers); diff --git a/src/executor/wall_time/perf/mod.rs b/src/executor/wall_time/perf/mod.rs index ebab89ad..3ea55d5d 100644 --- a/src/executor/wall_time/perf/mod.rs +++ b/src/executor/wall_time/perf/mod.rs @@ -295,19 +295,19 @@ impl PerfRunner { } FifoCommand::PingPerf => { if perf_fifo.lock().await.ping().await.is_err() { - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::Err)); } + return Ok(Some(FifoCommand::Ack)); } FifoCommand::GetIntegrationMode => { - return Ok(FifoCommand::IntegrationModeResponse(IntegrationMode::Perf)); - } - _ => { - warn!("Unhandled FIFO command: {cmd:?}"); - return Ok(FifoCommand::Err); + return Ok(Some(FifoCommand::IntegrationModeResponse( + IntegrationMode::Perf, + ))); } + _ => {} } - Ok(FifoCommand::Ack) + Ok(None) }; let (marker_result, fifo_data) = runner_fifo From e0600b17cb4824c8a629265d772fa9f766cf9430 Mon Sep 17 00:00:00 2001 From: not-matthias Date: Fri, 16 Jan 2026 17:38:55 +0100 Subject: [PATCH 2/2] chore(memtrack): bail if no events were written --- crates/memtrack/src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/memtrack/src/main.rs b/crates/memtrack/src/main.rs index 02019900..4d6a8046 100644 --- a/crates/memtrack/src/main.rs +++ b/crates/memtrack/src/main.rs @@ -177,6 +177,11 @@ fn track_command( } writer.finish()?; + if i == 0 { + bail!( + "No memtrack events were written to disk, does the integration support memory profiling?" + ) + } info!("Wrote {i} memtrack events to disk"); Ok(())