From 4482790a7ccc5b2915157a80298b0d8754528538 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Mar 2026 11:02:41 -0600 Subject: [PATCH 1/2] fix: improve Chrome trace event format correctness and metric naming - Write memory counter values in bytes instead of MB, with `_bytes` suffix on the args key, so trace viewers display correct values - Include the process ID in the trace filename (`comet-event-trace-{pid}.json`) so each executor writes its own file instead of corrupting a shared one - Use the actual process ID in trace events instead of hardcoded `1` - Replace `.expect()` panics with graceful error handling: the writer becomes `None` on failure so tracing silently disables itself - Add `Drop` impl to flush the `BufWriter` on shutdown so buffered events are not lost at process exit - Replace fragile `ThreadId` debug-string parsing with `ThreadId::as_u64().get()` (stable since Rust 1.74) - Rename `CometUnsafeShuffleWriter` trace event to snake_case `comet_unsafe_shuffle_writer` for consistency - Fix `"comet_shuffle_"` metric name (trailing underscore, no thread ID) in `CometBypassMergeSortShuffleWriter` to match the pattern used in `CometUnsafeShuffleWriter` - Rename `jvm_heapUsed` metric to `jvm_heap_used` (snake_case) - Emit `shuffle_spilled_bytes` counter event after each shuffle spill --- docs/source/contributor-guide/tracing.md | 36 +++++---- .../shuffle/partitioners/multi_partition.rs | 5 +- native/core/src/execution/tracing.rs | 80 ++++++++++++------- .../CometBypassMergeSortShuffleWriter.java | 5 +- .../shuffle/CometUnsafeShuffleWriter.java | 4 +- .../org/apache/comet/CometExecIterator.scala | 2 +- 6 files changed, 79 insertions(+), 53 deletions(-) diff --git a/docs/source/contributor-guide/tracing.md b/docs/source/contributor-guide/tracing.md index b9b4fe0dcc..c5bd666b5e 100644 --- a/docs/source/contributor-guide/tracing.md +++ b/docs/source/contributor-guide/tracing.md @@ -23,7 +23,8 @@ Tracing can be enabled by setting `spark.comet.tracing.enabled=true`. With this feature enabled, each Spark executor will write a JSON event log file in Chrome's [Trace Event Format]. The file will be written to the executor's current working -directory with the filename `comet-event-trace.json`. +directory with the filename `comet-event-trace-{pid}.json`, where `{pid}` is the executor +process ID. [Trace Event Format]: https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview?tab=t.0#heading=h.yr4qxyxotyw @@ -36,18 +37,18 @@ make release COMET_FEATURES="jemalloc" Example output: ```json -{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109225730 }, -{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109228835 }, -{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109245928 }, -{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109248843 }, -{ "name": "execute_plan", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109350935 }, -{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109367116 }, -{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109479156 }, +{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109225730 }, +{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109228835 }, +{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109245928 }, +{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109248843 }, +{ "name": "execute_plan", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109350935 }, +{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109367116 }, +{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109479156 }, ``` -Traces can be viewed with [Trace Viewer]. +Traces can be viewed with [Perfetto UI]. -[Trace Viewer]: https://github.com/catapult-project/catapult/blob/main/tracing/README.md +[Perfetto UI]: https://ui.perfetto.dev Example trace visualization: @@ -55,10 +56,11 @@ Example trace visualization: ## Definition of Labels -| Label | Meaning | -| --------------------- | -------------------------------------------------------------- | -| jvm_heapUsed | JVM heap memory usage of live objects for the executor process | -| jemalloc_allocated | Native memory usage for the executor process | -| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution | -| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark | -| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle | +| Label | Meaning | +| ------------------------ | -------------------------------------------------------------- | +| jvm_heap_used | JVM heap memory usage of live objects for the executor process | +| jemalloc_allocated | Native memory usage for the executor process | +| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution | +| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark | +| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle | +| shuffle_spilled_bytes | Bytes written to disk in a single shuffle spill operation | diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/core/src/execution/shuffle/partitioners/multi_partition.rs index 9c366ad462..e6c9ce37ae 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/core/src/execution/shuffle/partitioners/multi_partition.rs @@ -24,7 +24,7 @@ use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; use crate::execution::shuffle::{ comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter, }; -use crate::execution::tracing::{with_trace, with_trace_async}; +use crate::execution::tracing::{log_memory_usage, with_trace, with_trace_async}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; @@ -520,6 +520,9 @@ impl MultiPartitionShuffleRepartitioner { self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); + if self.tracing_enabled { + log_memory_usage("shuffle_spilled_bytes", spilled_bytes as u64); + } Ok(()) }) } diff --git a/native/core/src/execution/tracing.rs b/native/core/src/execution/tracing.rs index 01351565f5..ca87fce58b 100644 --- a/native/core/src/execution/tracing.rs +++ b/native/core/src/execution/tracing.rs @@ -19,7 +19,7 @@ use datafusion::common::instant::Instant; use once_cell::sync::Lazy; use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; pub(crate) static RECORDER: Lazy = Lazy::new(Recorder::new); @@ -27,29 +27,37 @@ pub(crate) static RECORDER: Lazy = Lazy::new(Recorder::new); /// https://github.com/catapult-project/catapult/blob/main/tracing/README.md pub struct Recorder { now: Instant, - writer: Arc>>, + pid: u32, + /// None if the trace file could not be opened or a write error has occurred. + writer: Mutex>>, } impl Recorder { pub fn new() -> Self { - let file = OpenOptions::new() + let pid = std::process::id(); + // Include the PID in the filename so that each executor process writes to + // its own file, avoiding interleaved output and data corruption. + let path = format!("comet-event-trace-{pid}.json"); + let writer = OpenOptions::new() .create(true) .append(true) - .open("comet-event-trace.json") - .expect("Error writing tracing"); + .open(&path) + .ok() + .and_then(|file| { + let mut w = BufWriter::new(file); + // Write start of JSON array. Note that there is no requirement to + // write the closing ']'. + w.write_all(b"[ ").ok()?; + Some(w) + }); - let mut writer = BufWriter::new(file); - - // Write start of JSON array. Note that there is no requirement to write - // the closing ']'. - writer - .write_all("[ ".as_bytes()) - .expect("Error writing tracing"); Self { now: Instant::now(), - writer: Arc::new(Mutex::new(writer)), + pid, + writer: Mutex::new(writer), } } + pub fn begin_task(&self, name: &str) { self.log_event(name, "B") } @@ -59,38 +67,50 @@ impl Recorder { } pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) { - let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize; + let key = format!("{name}_bytes"); let json = format!( - "{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n", + "{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": {}, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{key}\": {usage_bytes} }} }},\n", + self.pid, Self::get_thread_id(), self.now.elapsed().as_micros() ); - let mut writer = self.writer.lock().unwrap(); - writer - .write_all(json.as_bytes()) - .expect("Error writing tracing"); + self.write(&json); } fn log_event(&self, name: &str, ph: &str) { let json = format!( - "{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n", + "{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": {}, \"tid\": {}, \"ts\": {} }},\n", name, + self.pid, Self::get_thread_id(), self.now.elapsed().as_micros() ); - let mut writer = self.writer.lock().unwrap(); - writer - .write_all(json.as_bytes()) - .expect("Error writing tracing"); + self.write(&json); + } + + fn write(&self, json: &str) { + if let Ok(mut guard) = self.writer.lock() { + if let Some(ref mut w) = *guard { + if w.write_all(json.as_bytes()).is_err() { + // Disable tracing on write failure to avoid repeated errors. + *guard = None; + } + } + } } fn get_thread_id() -> u64 { - let thread_id = std::thread::current().id(); - format!("{thread_id:?}") - .trim_start_matches("ThreadId(") - .trim_end_matches(")") - .parse() - .expect("Error parsing thread id") + std::thread::current().id().as_u64().get() + } +} + +impl Drop for Recorder { + fn drop(&mut self) { + if let Ok(mut guard) = self.writer.lock() { + if let Some(ref mut w) = *guard { + let _ = w.flush(); + } + } } } diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java index a58ec7851b..5d5649efab 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java @@ -233,8 +233,9 @@ public void write(Iterator> records) throws IOException { } Native _native = new Native(); + String shuffleMemKey = "comet_shuffle_" + Thread.currentThread().getId(); if (tracingEnabled) { - _native.logMemoryUsage("comet_shuffle_", allocator.getUsed()); + _native.logMemoryUsage(shuffleMemKey, allocator.getUsed()); } long spillRecords = 0; @@ -247,7 +248,7 @@ public void write(Iterator> records) throws IOException { } if (tracingEnabled) { - _native.logMemoryUsage("comet_shuffle_", allocator.getUsed()); + _native.logMemoryUsage(shuffleMemKey, allocator.getUsed()); } if (outputRows != spillRecords) { diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java index 736c42aafa..47b0976623 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java @@ -212,7 +212,7 @@ public void write(scala.collection.Iterator> records) throws IOEx // generic throwables. boolean success = false; if (tracingEnabled) { - nativeLib.traceBegin("CometUnsafeShuffleWriter"); + nativeLib.traceBegin("comet_unsafe_shuffle_writer"); } String offheapMemKey = "comet_shuffle_" + Thread.currentThread().getId(); try { @@ -226,7 +226,7 @@ public void write(scala.collection.Iterator> records) throws IOEx success = true; } finally { if (tracingEnabled) { - nativeLib.traceEnd("CometUnsafeShuffleWriter"); + nativeLib.traceEnd("comet_unsafe_shuffle_writer"); } if (sorter != null) { try { diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 28c1645718..8eb0a3a1db 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -258,7 +258,7 @@ class CometExecIterator( } private def traceMemoryUsage(): Unit = { - nativeLib.logMemoryUsage("jvm_heapUsed", memoryMXBean.getHeapMemoryUsage.getUsed) + nativeLib.logMemoryUsage("jvm_heap_used", memoryMXBean.getHeapMemoryUsage.getUsed) val totalTaskMemory = cometTaskMemoryManager.internal.getMemoryConsumptionForThisTask val cometTaskMemory = cometTaskMemoryManager.getUsed val sparkTaskMemory = totalTaskMemory - cometTaskMemory From 963456e7c041c23c1579f3965f74fae15fa2e111 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Mar 2026 06:47:03 -0600 Subject: [PATCH 2/2] style: format tracing.md with prettier --- docs/source/contributor-guide/tracing.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/source/contributor-guide/tracing.md b/docs/source/contributor-guide/tracing.md index c5bd666b5e..3f74ac2894 100644 --- a/docs/source/contributor-guide/tracing.md +++ b/docs/source/contributor-guide/tracing.md @@ -56,11 +56,11 @@ Example trace visualization: ## Definition of Labels -| Label | Meaning | -| ------------------------ | -------------------------------------------------------------- | -| jvm_heap_used | JVM heap memory usage of live objects for the executor process | -| jemalloc_allocated | Native memory usage for the executor process | -| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution | -| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark | -| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle | -| shuffle_spilled_bytes | Bytes written to disk in a single shuffle spill operation | +| Label | Meaning | +| --------------------- | -------------------------------------------------------------- | +| jvm_heap_used | JVM heap memory usage of live objects for the executor process | +| jemalloc_allocated | Native memory usage for the executor process | +| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution | +| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark | +| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle | +| shuffle_spilled_bytes | Bytes written to disk in a single shuffle spill operation |