Skip to content

Commit 5bb1e8e

Browse files
committed
Add initial dhat profile tests
1 parent 00e36e8 commit 5bb1e8e

9 files changed

Lines changed: 721 additions & 66 deletions

Cargo.lock

Lines changed: 146 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ async-trait = { workspace = true }
166166
criterion = { workspace = true, features = ["async_tokio", "async_futures"] }
167167
ctor = { workspace = true }
168168
dashmap = "6.1.0"
169+
dhat = "0.3.3"
169170
datafusion-doc = { workspace = true }
170171
datafusion-functions-window-common = { workspace = true }
171172
datafusion-macros = { workspace = true }
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Heap profiling test for grouped hash aggregation with spilling.
19+
//! Data has many distinct groups to force hash table growth beyond
20+
//! the memory pool, triggering spilling.
21+
22+
#[global_allocator]
23+
static ALLOC: dhat::Alloc = dhat::Alloc;
24+
25+
use std::sync::Arc;
26+
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use datafusion_execution::memory_pool::FairSpillPool;
29+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
30+
31+
const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB
32+
33+
#[tokio::test]
34+
async fn heap_profile_hash_aggregate() {
35+
let _profiler = dhat::Profiler::builder().testing().build();
36+
37+
let runtime = RuntimeEnvBuilder::new()
38+
.with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT)))
39+
.build_arc()
40+
.unwrap();
41+
let config = SessionConfig::new().with_target_partitions(1);
42+
let ctx = SessionContext::new_with_config_rt(config, runtime);
43+
44+
// 5M distinct groups forces hash table growth and spilling
45+
let df = ctx
46+
.sql(
47+
"SELECT v, COUNT(*) \
48+
FROM generate_series(1, 5000000) AS t(v) \
49+
GROUP BY v",
50+
)
51+
.await
52+
.unwrap();
53+
let batches = df.collect().await.unwrap();
54+
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
55+
assert_eq!(row_count, 5_000_000);
56+
57+
let stats = dhat::HeapStats::get();
58+
let limit = (MEMORY_LIMIT as f64 * 1.1) as usize;
59+
println!(
60+
"hash_aggregate: max_bytes={}, memory_limit={}, ratio={:.2}x",
61+
stats.max_bytes,
62+
MEMORY_LIMIT,
63+
stats.max_bytes as f64 / MEMORY_LIMIT as f64
64+
);
65+
// TODO: peak is ~122MB (12.2x pool) because:
66+
// 1. HashTable size() underreports (uses capacity * sizeof instead of allocation_size())
67+
// 2. Hash table doubles capacity atomically inside intern(), before the pool check
68+
// 3. generate_series input data is not tracked by the MemoryPool
69+
// dhat::assert!(stats.max_bytes < limit,
70+
// "Peak heap {} exceeded {}", stats.max_bytes, limit);
71+
let _ = limit;
72+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Heap profiling test for HashJoinExec.
19+
20+
#[global_allocator]
21+
static ALLOC: dhat::Alloc = dhat::Alloc;
22+
23+
use std::sync::Arc;
24+
25+
use datafusion::prelude::{SessionConfig, SessionContext};
26+
use datafusion_execution::memory_pool::FairSpillPool;
27+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
28+
29+
const MEMORY_LIMIT: usize = 40 * 1024 * 1024; // 40MB
30+
31+
#[tokio::test]
32+
async fn heap_profile_hash_join() {
33+
// HashJoin does not spill, so the pool must fit the build side
34+
// hash table. 1M rows of (i64, i64) ~16MB plus hash table overhead.
35+
let runtime = RuntimeEnvBuilder::new()
36+
.with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT)))
37+
.build_arc()
38+
.unwrap();
39+
let config = SessionConfig::new().with_target_partitions(1);
40+
let ctx = SessionContext::new_with_config_rt(config, runtime);
41+
42+
// Create tables before starting the profiler
43+
ctx.sql(
44+
"CREATE TABLE t1 AS \
45+
SELECT v AS id, v * 2 AS val \
46+
FROM generate_series(1, 1000000) AS t(v)",
47+
)
48+
.await
49+
.unwrap();
50+
51+
ctx.sql(
52+
"CREATE TABLE t2 AS \
53+
SELECT v AS id, v * 3 AS val \
54+
FROM generate_series(1, 1000000) AS t(v)",
55+
)
56+
.await
57+
.unwrap();
58+
59+
// Verify HashJoin is used
60+
let explain = ctx
61+
.sql("EXPLAIN SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id")
62+
.await
63+
.unwrap()
64+
.collect()
65+
.await
66+
.unwrap();
67+
let plan_str = format!("{:?}", explain);
68+
assert!(
69+
plan_str.contains("HashJoinExec"),
70+
"Expected HashJoinExec in plan but got: {plan_str}"
71+
);
72+
73+
// Start profiling after table creation
74+
let _profiler = dhat::Profiler::builder().testing().build();
75+
76+
let df = ctx
77+
.sql("SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id")
78+
.await
79+
.unwrap();
80+
let _batches = df.collect().await.unwrap();
81+
82+
let stats = dhat::HeapStats::get();
83+
let limit = (MEMORY_LIMIT as f64 * 1.1) as usize;
84+
println!(
85+
"hash_join: max_bytes={}, memory_limit={}, ratio={:.2}x",
86+
stats.max_bytes,
87+
MEMORY_LIMIT,
88+
stats.max_bytes as f64 / MEMORY_LIMIT as f64
89+
);
90+
dhat::assert!(
91+
stats.max_bytes < limit,
92+
"Peak heap {} exceeded {}",
93+
stats.max_bytes,
94+
limit
95+
);
96+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Heap profiling test for reading parquet files and sorting.
19+
//! This exercises the parquet reader's allocation path alongside
20+
//! the sort operator. Data exceeds memory pool to force spilling.
21+
22+
#[global_allocator]
23+
static ALLOC: dhat::Alloc = dhat::Alloc;
24+
25+
use std::sync::Arc;
26+
27+
use datafusion::dataframe::DataFrameWriteOptions;
28+
use datafusion::prelude::{SessionConfig, SessionContext};
29+
use datafusion_execution::memory_pool::FairSpillPool;
30+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
31+
32+
const MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MB
33+
34+
#[tokio::test]
35+
async fn heap_profile_parquet_sort() {
36+
// Write test data to a parquet file using a separate context
37+
let tmpdir = tempfile::tempdir().unwrap();
38+
let parquet_path = tmpdir.path().join("test_data.parquet");
39+
{
40+
let write_ctx = SessionContext::new();
41+
let df = write_ctx
42+
.sql(
43+
"SELECT v AS id, v * 2 AS val, \
44+
CASE WHEN v % 3 = 0 THEN 'aaa' WHEN v % 3 = 1 THEN 'bbb' ELSE 'ccc' END AS category \
45+
FROM generate_series(1, 2000000) AS t(v)",
46+
)
47+
.await
48+
.unwrap();
49+
df.write_parquet(
50+
parquet_path.to_str().unwrap(),
51+
DataFrameWriteOptions::new().with_single_file_output(true),
52+
None,
53+
)
54+
.await
55+
.unwrap();
56+
}
57+
58+
// Set up the memory-limited context for reading
59+
let runtime = RuntimeEnvBuilder::new()
60+
.with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT)))
61+
.build_arc()
62+
.unwrap();
63+
let config = SessionConfig::new()
64+
.with_target_partitions(1)
65+
.with_sort_spill_reservation_bytes(5 * 1024 * 1024);
66+
let ctx = SessionContext::new_with_config_rt(config, runtime);
67+
68+
ctx.register_parquet("t", parquet_path.to_str().unwrap(), Default::default())
69+
.await
70+
.unwrap();
71+
72+
// Start profiling before planning
73+
let _profiler = dhat::Profiler::builder().testing().build();
74+
75+
let df = ctx
76+
.sql("SELECT * FROM t ORDER BY id DESC")
77+
.await
78+
.unwrap();
79+
let _batches = df.collect().await.unwrap();
80+
81+
let stats = dhat::HeapStats::get();
82+
let limit = (MEMORY_LIMIT as f64 * 1.1) as usize;
83+
println!(
84+
"parquet_sort: max_bytes={}, memory_limit={}, ratio={:.2}x",
85+
stats.max_bytes,
86+
MEMORY_LIMIT,
87+
stats.max_bytes as f64 / MEMORY_LIMIT as f64
88+
);
89+
// TODO: peak is ~67MB (3.3x pool) because parquet decoded
90+
// batches and sort output arrays are not tracked by the MemoryPool.
91+
// dhat::assert!(stats.max_bytes < limit,
92+
// "Peak heap {} exceeded {}", stats.max_bytes, limit);
93+
let _ = limit;
94+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Heap profiling test for RepartitionExec with multiple partitions.
19+
//! Uses enough data with a GROUP BY to force repartition buffering
20+
//! under memory pressure.
21+
22+
#[global_allocator]
23+
static ALLOC: dhat::Alloc = dhat::Alloc;
24+
25+
use std::sync::Arc;
26+
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use datafusion_execution::memory_pool::FairSpillPool;
29+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
30+
31+
const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB
32+
33+
#[tokio::test]
34+
async fn heap_profile_repartition() {
35+
let _profiler = dhat::Profiler::builder().testing().build();
36+
37+
let runtime = RuntimeEnvBuilder::new()
38+
.with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT)))
39+
.build_arc()
40+
.unwrap();
41+
// Use multiple partitions to exercise RepartitionExec
42+
let config = SessionConfig::new().with_target_partitions(4);
43+
let ctx = SessionContext::new_with_config_rt(config, runtime);
44+
45+
// GROUP BY forces repartition by hash + aggregate spilling
46+
let df = ctx
47+
.sql(
48+
"SELECT v % 100000, COUNT(*) \
49+
FROM generate_series(1, 5000000) AS t(v) \
50+
GROUP BY v % 100000",
51+
)
52+
.await
53+
.unwrap();
54+
let _batches = df.collect().await.unwrap();
55+
56+
let stats = dhat::HeapStats::get();
57+
let limit = (MEMORY_LIMIT as f64 * 1.1) as usize;
58+
println!(
59+
"repartition: max_bytes={}, memory_limit={}, ratio={:.2}x",
60+
stats.max_bytes,
61+
MEMORY_LIMIT,
62+
stats.max_bytes as f64 / MEMORY_LIMIT as f64
63+
);
64+
dhat::assert!(
65+
stats.max_bytes < limit,
66+
"Peak heap {} exceeded {}",
67+
stats.max_bytes,
68+
limit
69+
);
70+
}

0 commit comments

Comments
 (0)