Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/memtrack/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ fn track_command(
}
writer.finish()?;

if i == 0 {
bail!(
"No memtrack events were written to disk, does the integration support memory profiling?"
)
}
info!("Wrote {i} memtrack events to disk");

Ok(())
Expand Down
24 changes: 13 additions & 11 deletions src/executor/memory/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,36 +154,38 @@ impl MemoryExecutor {

let on_cmd = async move |cmd: &FifoCommand| {
match cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
debug!("Current benchmark: {pid}, {uri}");
FifoCommand::SetVersion(protocol_version) => {
if *protocol_version < 2 {
bail!(
"Memory profiling requires protocol version 2 or higher, but the integration is using version {protocol_version}. \
This integration doesn't support memory profiling. Please update your integration to a version that supports memory profiling.",
);
}
}
FifoCommand::StartBenchmark => {
debug!("Enabling memtrack via IPC");
if let Err(e) = ipc_client.enable() {
error!("Failed to enable memtrack: {e}");
return Ok(FifoCommand::Err);
return Ok(Some(FifoCommand::Err));
}
}
FifoCommand::StopBenchmark => {
debug!("Disabling memtrack via IPC");
if let Err(e) = ipc_client.disable() {
// There's a chance that memtrack has already exited here, so just log as debug
debug!("Failed to disable memtrack: {e}");
return Ok(FifoCommand::Err);
return Ok(Some(FifoCommand::Err));
}
}
FifoCommand::GetIntegrationMode => {
return Ok(FifoCommand::IntegrationModeResponse(
return Ok(Some(FifoCommand::IntegrationModeResponse(
IntegrationMode::Analysis,
));
}
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
return Ok(FifoCommand::Err);
)));
}
_ => {}
}

Ok(FifoCommand::Ack)
Ok(None)
};

let (marker_result, _) = runner_fifo
Expand Down
45 changes: 27 additions & 18 deletions src/executor/shared/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ impl RunnerFifo {

/// Handles all incoming FIFO messages until it's closed, or until the health check closure
/// returns `false` or an error.
///
/// The `handle_cmd` callback is invoked first for each command. If it returns `Some(response)`,
/// that response is sent and the shared implementation is skipped. If it returns `None`,
/// the command falls through to the shared implementation for standard handling.
pub async fn handle_fifo_messages(
&mut self,
mut health_check: impl AsyncFnMut() -> anyhow::Result<bool>,
mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result<FifoCommand>,
mut handle_cmd: impl AsyncFnMut(&FifoCommand) -> anyhow::Result<Option<FifoCommand>>,
) -> anyhow::Result<(ExecutionTimestamps, FifoBenchmarkData)> {
let mut bench_order_by_timestamp = Vec::<(u64, String)>::new();
let mut bench_pids = HashSet::<pid_t>::new();
Expand Down Expand Up @@ -158,38 +162,44 @@ impl RunnerFifo {
};
trace!("Received command: {cmd:?}");

// Try executor-specific handler first
if let Some(ack) = handle_cmd(&cmd).await? {
self.send_cmd(ack).await?;
continue;
}

// Fall through to shared implementation for standard commands
match &cmd {
FifoCommand::CurrentBenchmark { pid, uri } => {
bench_order_by_timestamp.push((current_time(), uri.to_string()));
bench_pids.insert(*pid);
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::StartBenchmark => {
if benchmark_started {
if !benchmark_started {
benchmark_started = true;
markers.push(MarkerType::SampleStart(current_time()));
} else {
warn!("Received duplicate StartBenchmark command, ignoring");
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
benchmark_started = true;
markers.push(MarkerType::SampleStart(current_time()));
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::StopBenchmark => {
if !benchmark_started {
if benchmark_started {
benchmark_started = false;
markers.push(MarkerType::SampleEnd(current_time()));
} else {
warn!("Received StopBenchmark command before StartBenchmark, ignoring");
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
benchmark_started = false;
markers.push(MarkerType::SampleEnd(current_time()));
self.send_cmd(FifoCommand::Ack).await?;
}
FifoCommand::SetIntegration { name, version } => {
integration = Some((name.into(), version.into()));
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
FifoCommand::AddMarker { marker, .. } => {
markers.push(*marker);
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
FifoCommand::SetVersion(protocol_version) => {
match protocol_version.cmp(&runner_shared::fifo::CURRENT_PROTOCOL_VERSION) {
Expand All @@ -204,22 +214,21 @@ impl RunnerFifo {
);
}
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
Ordering::Greater => bail!(
"Runner is using an incompatible protocol version ({} < {protocol_version}). Please update the runner to the latest version.",
runner_shared::fifo::CURRENT_PROTOCOL_VERSION
),
Ordering::Equal => {
self.send_cmd(FifoCommand::Ack).await?;
continue;
}
}
}
_ => {}
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
self.send_cmd(FifoCommand::Err).await?;
}
}

self.send_cmd(handle_cmd(&cmd).await?).await?;
}

let marker_result = ExecutionTimestamps::new(&bench_order_by_timestamp, &markers);
Expand Down
14 changes: 7 additions & 7 deletions src/executor/wall_time/perf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,19 @@ impl PerfRunner {
}
FifoCommand::PingPerf => {
if perf_fifo.lock().await.ping().await.is_err() {
return Ok(FifoCommand::Err);
return Ok(Some(FifoCommand::Err));
}
return Ok(Some(FifoCommand::Ack));
}
FifoCommand::GetIntegrationMode => {
return Ok(FifoCommand::IntegrationModeResponse(IntegrationMode::Perf));
}
_ => {
warn!("Unhandled FIFO command: {cmd:?}");
return Ok(FifoCommand::Err);
return Ok(Some(FifoCommand::IntegrationModeResponse(
IntegrationMode::Perf,
)));
}
_ => {}
}

Ok(FifoCommand::Ack)
Ok(None)
};

let (marker_result, fifo_data) = runner_fifo
Expand Down
Loading