Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions docs/source/contributor-guide/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ Tracing can be enabled by setting `spark.comet.tracing.enabled=true`.

With this feature enabled, each Spark executor will write a JSON event log file in
Chrome's [Trace Event Format]. The file will be written to the executor's current working
directory with the filename `comet-event-trace.json`.
directory with the filename `comet-event-trace-{pid}.json`, where `{pid}` is the executor
process ID.

[Trace Event Format]: https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview?tab=t.0#heading=h.yr4qxyxotyw

Expand All @@ -36,18 +37,18 @@ make release COMET_FEATURES="jemalloc"
Example output:

```json
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109225730 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109228835 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109245928 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109248843 },
{ "name": "execute_plan", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109350935 },
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "E", "pid": 1, "tid": 5, "ts": 10109367116 },
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "B", "pid": 1, "tid": 5, "ts": 10109479156 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109225730 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109228835 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109245928 },
{ "name": "decodeShuffleBlock", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109248843 },
{ "name": "execute_plan", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109350935 },
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "E", "pid": 12345, "tid": 5, "ts": 10109367116 },
{ "name": "CometExecIterator_getNextBatch", "cat": "PERF", "ph": "B", "pid": 12345, "tid": 5, "ts": 10109479156 },
```

Traces can be viewed with [Trace Viewer].
Traces can be viewed with [Perfetto UI].

[Trace Viewer]: https://github.com/catapult-project/catapult/blob/main/tracing/README.md
[Perfetto UI]: https://ui.perfetto.dev

Example trace visualization:

Expand All @@ -57,8 +58,9 @@ Example trace visualization:

| Label | Meaning |
| --------------------- | -------------------------------------------------------------- |
| jvm_heapUsed | JVM heap memory usage of live objects for the executor process |
| jvm_heap_used | JVM heap memory usage of live objects for the executor process |
| jemalloc_allocated | Native memory usage for the executor process |
| task_memory_comet_NNN | Off-heap memory allocated by Comet for query execution |
| task_memory_spark_NNN | On-heap & Off-heap memory allocated by Spark |
| comet_shuffle_NNN | Off-heap memory allocated by Comet for columnar shuffle |
| shuffle_spilled_bytes | Bytes written to disk in a single shuffle spill operation |
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter};
use crate::execution::shuffle::{
comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter,
};
use crate::execution::tracing::{with_trace, with_trace_async};
use crate::execution::tracing::{log_memory_usage, with_trace, with_trace_async};
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
use datafusion::common::utils::proxy::VecAllocExt;
Expand Down Expand Up @@ -520,6 +520,9 @@ impl MultiPartitionShuffleRepartitioner {
self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(spilled_bytes);
if self.tracing_enabled {
log_memory_usage("shuffle_spilled_bytes", spilled_bytes as u64);
}
Ok(())
})
}
Expand Down
80 changes: 50 additions & 30 deletions native/core/src/execution/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,45 @@ use datafusion::common::instant::Instant;
use once_cell::sync::Lazy;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;

pub(crate) static RECORDER: Lazy<Recorder> = Lazy::new(Recorder::new);

/// Log events using Chrome trace format JSON
/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md
pub struct Recorder {
now: Instant,
writer: Arc<Mutex<BufWriter<File>>>,
pid: u32,
/// None if the trace file could not be opened or a write error has occurred.
writer: Mutex<Option<BufWriter<File>>>,
}

impl Recorder {
pub fn new() -> Self {
let file = OpenOptions::new()
let pid = std::process::id();
// Include the PID in the filename so that each executor process writes to
// its own file, avoiding interleaved output and data corruption.
let path = format!("comet-event-trace-{pid}.json");
let writer = OpenOptions::new()
.create(true)
.append(true)
.open("comet-event-trace.json")
.expect("Error writing tracing");
.open(&path)
.ok()
.and_then(|file| {
let mut w = BufWriter::new(file);
// Write start of JSON array. Note that there is no requirement to
// write the closing ']'.
w.write_all(b"[ ").ok()?;
Some(w)
});

let mut writer = BufWriter::new(file);

// Write start of JSON array. Note that there is no requirement to write
// the closing ']'.
writer
.write_all("[ ".as_bytes())
.expect("Error writing tracing");
Self {
now: Instant::now(),
writer: Arc::new(Mutex::new(writer)),
pid,
writer: Mutex::new(writer),
}
}

pub fn begin_task(&self, name: &str) {
self.log_event(name, "B")
}
Expand All @@ -59,38 +67,50 @@ impl Recorder {
}

pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) {
let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize;
let key = format!("{name}_bytes");
let json = format!(
"{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n",
"{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": {}, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{key}\": {usage_bytes} }} }},\n",
self.pid,
Self::get_thread_id(),
self.now.elapsed().as_micros()
);
let mut writer = self.writer.lock().unwrap();
writer
.write_all(json.as_bytes())
.expect("Error writing tracing");
self.write(&json);
}

fn log_event(&self, name: &str, ph: &str) {
let json = format!(
"{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n",
"{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": {}, \"tid\": {}, \"ts\": {} }},\n",
name,
self.pid,
Self::get_thread_id(),
self.now.elapsed().as_micros()
);
let mut writer = self.writer.lock().unwrap();
writer
.write_all(json.as_bytes())
.expect("Error writing tracing");
self.write(&json);
}

fn write(&self, json: &str) {
if let Ok(mut guard) = self.writer.lock() {
if let Some(ref mut w) = *guard {
if w.write_all(json.as_bytes()).is_err() {
// Disable tracing on write failure to avoid repeated errors.
*guard = None;
}
}
}
}

fn get_thread_id() -> u64 {
let thread_id = std::thread::current().id();
format!("{thread_id:?}")
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.expect("Error parsing thread id")
std::thread::current().id().as_u64().get()
}
}

impl Drop for Recorder {
fn drop(&mut self) {
if let Ok(mut guard) = self.writer.lock() {
if let Some(ref mut w) = *guard {
let _ = w.flush();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

Native _native = new Native();
String shuffleMemKey = "comet_shuffle_" + Thread.currentThread().getId();
if (tracingEnabled) {
_native.logMemoryUsage("comet_shuffle_", allocator.getUsed());
_native.logMemoryUsage(shuffleMemKey, allocator.getUsed());
}

long spillRecords = 0;
Expand All @@ -247,7 +248,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

if (tracingEnabled) {
_native.logMemoryUsage("comet_shuffle_", allocator.getUsed());
_native.logMemoryUsage(shuffleMemKey, allocator.getUsed());
}

if (outputRows != spillRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
// generic throwables.
boolean success = false;
if (tracingEnabled) {
nativeLib.traceBegin("CometUnsafeShuffleWriter");
nativeLib.traceBegin("comet_unsafe_shuffle_writer");
}
String offheapMemKey = "comet_shuffle_" + Thread.currentThread().getId();
try {
Expand All @@ -226,7 +226,7 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
success = true;
} finally {
if (tracingEnabled) {
nativeLib.traceEnd("CometUnsafeShuffleWriter");
nativeLib.traceEnd("comet_unsafe_shuffle_writer");
}
if (sorter != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class CometExecIterator(
}

private def traceMemoryUsage(): Unit = {
nativeLib.logMemoryUsage("jvm_heapUsed", memoryMXBean.getHeapMemoryUsage.getUsed)
nativeLib.logMemoryUsage("jvm_heap_used", memoryMXBean.getHeapMemoryUsage.getUsed)
val totalTaskMemory = cometTaskMemoryManager.internal.getMemoryConsumptionForThisTask
val cometTaskMemory = cometTaskMemoryManager.getUsed
val sparkTaskMemory = totalTaskMemory - cometTaskMemory
Expand Down
Loading