From f87654351c350246577472553d348d38df39cbbd Mon Sep 17 00:00:00 2001 From: cetra3 Date: Fri, 6 Mar 2026 07:09:29 +1030 Subject: [PATCH] Add initial dhat profile tests --- Cargo.lock | 212 ++++++++++++------ datafusion/core/Cargo.toml | 1 + .../core/tests/heap_profile_hash_aggregate.rs | 72 ++++++ .../core/tests/heap_profile_hash_join.rs | 96 ++++++++ .../core/tests/heap_profile_parquet_sort.rs | 91 ++++++++ .../core/tests/heap_profile_repartition.rs | 68 ++++++ datafusion/core/tests/heap_profile_sort.rs | 70 ++++++ .../tests/heap_profile_sort_merge_join.rs | 101 +++++++++ datafusion/core/tests/heap_profile_window.rs | 71 ++++++ 9 files changed, 716 insertions(+), 66 deletions(-) create mode 100644 datafusion/core/tests/heap_profile_hash_aggregate.rs create mode 100644 datafusion/core/tests/heap_profile_hash_join.rs create mode 100644 datafusion/core/tests/heap_profile_parquet_sort.rs create mode 100644 datafusion/core/tests/heap_profile_repartition.rs create mode 100644 datafusion/core/tests/heap_profile_sort.rs create mode 100644 datafusion/core/tests/heap_profile_sort_merge_join.rs create mode 100644 datafusion/core/tests/heap_profile_window.rs diff --git a/Cargo.lock b/Cargo.lock index 38fa83dd12119..65f9ac8c5c27d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,15 @@ dependencies = [ "core_extensions", ] +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -215,7 +224,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0c269894b6fe5e9d7ada0cf69b5bf847ff35bc25fc271f08e1d080fce80339a" dependencies = [ - "object", + "object 0.32.2", ] [[package]] @@ -507,7 +516,7 @@ dependencies = [ "futures-core", "libc", "portable-atomic", - "rustc-hash", + "rustc-hash 2.1.1", "tokio", "tokio-stream", "xattr", @@ -515,9 +524,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d67d43201f4d20c78bcda740c142ca52482d81da80681533d33bf3f0596c8e2" +checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" dependencies = [ "compression-codecs", "compression-core", @@ -631,9 +640,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.13" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d203b0bf2626dcba8665f5cd0871d7c2c0930223d6b6be9097592fea21242d0" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -785,9 +794,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.13" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc50d0f63e714784b84223abd7abbc8577de8c35d699e0edd19f0a88a08ae13" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" dependencies = [ "futures-util", "pin-project-lite", @@ -796,9 +805,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.63.4" +version = "0.63.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af4a8a5fe3e4ac7ee871237c340bbce13e982d37543b65700f4419e039f5d78e" +checksum = "d619373d490ad70966994801bc126846afaa0d1ee920697a031f0cf63f2568e7" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -817,9 +826,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.10" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0709f0083aa19b704132684bc26d3c868e06bd428ccc4373b0b55c3e8748a58b" +checksum = "00ccbb08c10f6bcf912f398188e42ee2eab5f1767ce215a02a73bc5df1bbdd95" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -869,9 +878,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.1" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd3dfc18c1ce097cf81fced7192731e63809829c6cbf933c1ec47452d08e1aa" +checksum = "22ccf7f6eba8b2dcf8ce9b74806c6c185659c311665c4bf8d6e71ebd454db6bf" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -894,9 +903,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.5" +version = "1.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af6e5def28be846479bbeac55aa4603d6f7986fc5da4601ba324dd5d377516" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -911,9 +920,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.5" +version = "1.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca2734c16913a45343b37313605d84e7d8b34a4611598ce1d25b35860a2bed3" +checksum = "d2b1117b3b2bbe166d11199b540ceed0d0f7676e36e7b962b5a437a9971eac75" dependencies = [ "base64-simd", "bytes", @@ -998,6 +1007,21 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object 0.37.3", + "rustc-demangle", + "windows-link", +] + [[package]] name = "base64" version = "0.21.7" @@ -1536,9 +1560,9 @@ dependencies = [ [[package]] name = "criterion" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d883447757bb0ee46f233e9dc22eb84d93a9508c9b868687b274fc431d886bf" +checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" dependencies = [ "alloca", "anes", @@ -1563,9 +1587,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed943f81ea2faa8dcecbbfa50164acf95d555afec96a27871663b300e387b2e4" +checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" dependencies = [ "cast", "itertools 0.13.0", @@ -1755,6 +1779,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "datafusion-sql", + "dhat", "doc-comment", "env_logger", "flate2", @@ -2713,6 +2738,22 @@ dependencies = [ "serde_core", ] +[[package]] +name = "dhat" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" +dependencies = [ + "backtrace", + "lazy_static", + "mintex", + "parking_lot", + "rustc-hash 1.1.0", + "serde", + "serde_json", + "thousands", +] + [[package]] name = "diff" version = "0.1.13" @@ -2748,7 +2789,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2858,9 +2899,9 @@ dependencies = [ [[package]] name = "env_filter" -version = "0.1.4" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" dependencies = [ "log", "regex", @@ -2868,9 +2909,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.8" +version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" dependencies = [ "anstream", "anstyle", @@ -2892,7 +2933,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3238,6 +3279,12 @@ dependencies = [ "wasip3", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.3" @@ -3695,9 +3742,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.18.3" +version = "0.18.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9375e112e4b463ec1b1c6c011953545c65a30164fbab5b581df32b3abf0dcb88" +checksum = "25470f23803092da7d239834776d653104d551bc4d7eacaf31e6837854b8e9eb" dependencies = [ "console 0.16.2", "portable-atomic", @@ -3821,9 +3868,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.89" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4eacb0641a310445a4c513f2a5e23e19952e269c6a38887254d5f837a305506" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" dependencies = [ "once_cell", "wasm-bindgen", @@ -4079,6 +4126,12 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "mintex" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c505b3e17ed6b70a7ed2e67fbb2c560ee327353556120d6e72f5232b6880d536" + [[package]] name = "mio" version = "1.1.1" @@ -4153,7 +4206,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4265,6 +4318,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "object_store" version = "0.13.1" @@ -4771,7 +4833,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.13.0", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -4790,7 +4852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -4851,7 +4913,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", "socket2", "thiserror", @@ -4871,7 +4933,7 @@ dependencies = [ "lru-slab", "rand 0.9.2", "ring", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", "rustls-pki-types", "slab", @@ -4892,7 +4954,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -5080,9 +5142,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -5234,6 +5296,18 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "rustc-demangle" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -5259,7 +5333,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5959,15 +6033,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.24.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6041,6 +6115,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "thread_local" version = "1.1.9" @@ -6257,9 +6337,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a286e33f82f8a1ee2df63f4fa35c0becf4a85a0cb03091a15fd7bf0b402dc94a" +checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" dependencies = [ "async-trait", "axum", @@ -6720,9 +6800,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.112" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05d7d0fce354c88b7982aec4400b3e7fcf723c32737cef571bd165f7613557ee" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" dependencies = [ "cfg-if", "once_cell", @@ -6733,9 +6813,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.62" +version = "0.4.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee85afca410ac4abba5b584b12e77ea225db6ee5471d0aebaae0861166f9378a" +checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" dependencies = [ "cfg-if", "futures-util", @@ -6747,9 +6827,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.112" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55839b71ba921e4f75b674cb16f843f4b1f3b26ddfcb3454de1cf65cc021ec0f" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -6757,9 +6837,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.112" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf2e969c2d60ff52e7e98b7392ff1588bffdd1ccd4769eba27222fd3d621571" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" dependencies = [ "bumpalo", "proc-macro2", @@ -6770,18 +6850,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.112" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0861f0dcdf46ea819407495634953cdcc8a8c7215ab799a7a7ce366be71c7b30" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" dependencies = [ "unicode-ident", ] [[package]] name = "wasm-bindgen-test" -version = "0.3.62" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12430eab93df2be01b6575bf8e05700945dafa62d6fa40faa07b0ea9afd8add1" +checksum = "6311c867385cc7d5602463b31825d454d0837a3aba7cdb5e56d5201792a3f7fe" dependencies = [ "async-trait", "cast", @@ -6801,9 +6881,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-macro" -version = "0.3.62" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce7d6debc1772c3502c727c8c47180c040c8741f7fcf6e731d6ef57818d59ae2" +checksum = "67008cdde4769831958536b0f11b3bdd0380bde882be17fff9c2f34bb4549abd" dependencies = [ "proc-macro2", "quote", @@ -6812,9 +6892,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-shared" -version = "0.2.112" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4f79c547a8daa04318dac7646f579a016f819452c34bcb14e8dda0e77a4386c" +checksum = "cfe29135b180b72b04c74aa97b2b4a2ef275161eff9a6c7955ea9eaedc7e1d4e" [[package]] name = "wasm-encoder" @@ -6865,9 +6945,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.89" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10053fbf9a374174094915bbce141e87a6bf32ecd9a002980db4b638405e8962" +checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" dependencies = [ "js-sys", "wasm-bindgen", @@ -6925,7 +7005,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 8965948a0f4e2..4ed720e8f7220 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -166,6 +166,7 @@ async-trait = { workspace = true } criterion = { workspace = true, features = ["async_tokio", "async_futures"] } ctor = { workspace = true } dashmap = "6.1.0" +dhat = "0.3.3" datafusion-doc = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-macros = { workspace = true } diff --git a/datafusion/core/tests/heap_profile_hash_aggregate.rs b/datafusion/core/tests/heap_profile_hash_aggregate.rs new file mode 100644 index 0000000000000..c657517dff5cb --- /dev/null +++ b/datafusion/core/tests/heap_profile_hash_aggregate.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for grouped hash aggregation with spilling. +//! Data has many distinct groups to force hash table growth beyond +//! the memory pool, triggering spilling. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB + +#[tokio::test] +async fn heap_profile_hash_aggregate() { + let _profiler = dhat::Profiler::builder().testing().build(); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // 5M distinct groups forces hash table growth and spilling + let df = ctx + .sql( + "SELECT v, COUNT(*) \ + FROM generate_series(1, 5000000) AS t(v) \ + GROUP BY v", + ) + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(row_count, 5_000_000); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "hash_aggregate: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + // TODO: peak is ~122MB (12.2x pool) because: + // 1. HashTable size() underreports (uses capacity * sizeof instead of allocation_size()) + // 2. Hash table doubles capacity atomically inside intern(), before the pool check + // 3. generate_series input data is not tracked by the MemoryPool + // dhat::assert!(stats.max_bytes < limit, + // "Peak heap {} exceeded {}", stats.max_bytes, limit); + let _ = limit; +} diff --git a/datafusion/core/tests/heap_profile_hash_join.rs b/datafusion/core/tests/heap_profile_hash_join.rs new file mode 100644 index 0000000000000..9c6e13f7363af --- /dev/null +++ b/datafusion/core/tests/heap_profile_hash_join.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for HashJoinExec. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 40 * 1024 * 1024; // 40MB + +#[tokio::test] +async fn heap_profile_hash_join() { + // HashJoin does not spill, so the pool must fit the build side + // hash table. 1M rows of (i64, i64) ~16MB plus hash table overhead. + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // Create tables before starting the profiler + ctx.sql( + "CREATE TABLE t1 AS \ + SELECT v AS id, v * 2 AS val \ + FROM generate_series(1, 1000000) AS t(v)", + ) + .await + .unwrap(); + + ctx.sql( + "CREATE TABLE t2 AS \ + SELECT v AS id, v * 3 AS val \ + FROM generate_series(1, 1000000) AS t(v)", + ) + .await + .unwrap(); + + // Verify HashJoin is used + let explain = ctx + .sql("EXPLAIN SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id") + .await + .unwrap() + .collect() + .await + .unwrap(); + let plan_str = format!("{explain:?}"); + assert!( + plan_str.contains("HashJoinExec"), + "Expected HashJoinExec in plan but got: {plan_str}" + ); + + // Start profiling after table creation + let _profiler = dhat::Profiler::builder().testing().build(); + + let df = ctx + .sql("SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id") + .await + .unwrap(); + let _batches = df.collect().await.unwrap(); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "hash_join: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + dhat::assert!( + stats.max_bytes < limit, + "Peak heap {} exceeded {}", + stats.max_bytes, + limit + ); +} diff --git a/datafusion/core/tests/heap_profile_parquet_sort.rs b/datafusion/core/tests/heap_profile_parquet_sort.rs new file mode 100644 index 0000000000000..4e88e0f7ada8c --- /dev/null +++ b/datafusion/core/tests/heap_profile_parquet_sort.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for reading parquet files and sorting. +//! This exercises the parquet reader's allocation path alongside +//! the sort operator. Data exceeds memory pool to force spilling. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 20 * 1024 * 1024; // 20MB + +#[tokio::test] +async fn heap_profile_parquet_sort() { + // Write test data to a parquet file using a separate context + let tmpdir = tempfile::tempdir().unwrap(); + let parquet_path = tmpdir.path().join("test_data.parquet"); + { + let write_ctx = SessionContext::new(); + let df = write_ctx + .sql( + "SELECT v AS id, v * 2 AS val, \ + CASE WHEN v % 3 = 0 THEN 'aaa' WHEN v % 3 = 1 THEN 'bbb' ELSE 'ccc' END AS category \ + FROM generate_series(1, 2000000) AS t(v)", + ) + .await + .unwrap(); + df.write_parquet( + parquet_path.to_str().unwrap(), + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await + .unwrap(); + } + + // Set up the memory-limited context for reading + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + let config = SessionConfig::new() + .with_target_partitions(1) + .with_sort_spill_reservation_bytes(5 * 1024 * 1024); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + ctx.register_parquet("t", parquet_path.to_str().unwrap(), Default::default()) + .await + .unwrap(); + + // Start profiling before planning + let _profiler = dhat::Profiler::builder().testing().build(); + + let df = ctx.sql("SELECT * FROM t ORDER BY id DESC").await.unwrap(); + let _batches = df.collect().await.unwrap(); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "parquet_sort: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + // TODO: peak is ~67MB (3.3x pool) because parquet decoded + // batches and sort output arrays are not tracked by the MemoryPool. + // dhat::assert!(stats.max_bytes < limit, + // "Peak heap {} exceeded {}", stats.max_bytes, limit); + let _ = limit; +} diff --git a/datafusion/core/tests/heap_profile_repartition.rs b/datafusion/core/tests/heap_profile_repartition.rs new file mode 100644 index 0000000000000..5590ce885fa7f --- /dev/null +++ b/datafusion/core/tests/heap_profile_repartition.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for RepartitionExec with multiple partitions. +//! Uses enough data with a GROUP BY to force repartition buffering +//! under memory pressure. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB + +#[tokio::test] +async fn heap_profile_repartition() { + let _profiler = dhat::Profiler::builder().testing().build(); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + // Use multiple partitions to exercise RepartitionExec + let config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // GROUP BY forces repartition by hash + aggregate spilling + let df = ctx + .sql( + "SELECT v % 100000, COUNT(*) \ + FROM generate_series(1, 5000000) AS t(v) \ + GROUP BY v % 100000", + ) + .await + .unwrap(); + let _batches = df.collect().await.unwrap(); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "repartition: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + // TODO: peak is ~20MB (1.97x pool) + // dhat::assert!(stats.max_bytes < limit, + // "Peak heap {} exceeded {}", stats.max_bytes, limit); + let _ = limit; +} diff --git a/datafusion/core/tests/heap_profile_sort.rs b/datafusion/core/tests/heap_profile_sort.rs new file mode 100644 index 0000000000000..b40f0c9ec2673 --- /dev/null +++ b/datafusion/core/tests/heap_profile_sort.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for SortExec with spilling. +//! +//! Uses dhat-rs to measure actual heap usage and assert it stays +//! within expected bounds. Data size exceeds memory pool to force +//! spilling to disk. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB + +#[tokio::test] +async fn heap_profile_sort() { + let _profiler = dhat::Profiler::builder().testing().build(); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + let config = SessionConfig::new() + .with_target_partitions(1) + .with_sort_spill_reservation_bytes(3 * 1024 * 1024); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // 8M rows of Int64 (~61MB) forces spilling with 10MB pool + let df = ctx + .sql("SELECT * FROM generate_series(1, 8000000) AS t(v) ORDER BY v") + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(row_count, 8_000_000); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "sort: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + // TODO: peak is ~66MB (6.6x pool) because generate_series input + // data (~61MB of Int64) is not tracked by the MemoryPool. + // dhat::assert!(stats.max_bytes < limit, + // "Peak heap {} exceeded {}", stats.max_bytes, limit); + let _ = limit; +} diff --git a/datafusion/core/tests/heap_profile_sort_merge_join.rs b/datafusion/core/tests/heap_profile_sort_merge_join.rs new file mode 100644 index 0000000000000..7184bfc58871a --- /dev/null +++ b/datafusion/core/tests/heap_profile_sort_merge_join.rs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for SortMergeJoinExec. +//! Data is larger than the memory pool to exercise sort spilling +//! before the merge join. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 40 * 1024 * 1024; // 40MB + +#[tokio::test] +async fn heap_profile_sort_merge_join() { + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + // Force sort-merge join: disable hash join preference and use + // multiple partitions so CollectLeft hash join isn't chosen. + let config = SessionConfig::new() + .with_target_partitions(4) + .with_sort_spill_reservation_bytes(2 * 1024 * 1024) + .set_str("datafusion.optimizer.prefer_hash_join", "false"); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // Create tables before starting the profiler + ctx.sql( + "CREATE TABLE t1 AS \ + SELECT v AS id, v * 2 AS val \ + FROM generate_series(1, 1000000) AS t(v)", + ) + .await + .unwrap(); + + ctx.sql( + "CREATE TABLE t2 AS \ + SELECT v AS id, v * 3 AS val \ + FROM generate_series(1, 1000000) AS t(v)", + ) + .await + .unwrap(); + + // Verify SortMergeJoin is used + let explain = ctx + .sql("EXPLAIN SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id") + .await + .unwrap() + .collect() + .await + .unwrap(); + let plan_str = format!("{explain:?}"); + assert!( + plan_str.contains("SortMergeJoin"), + "Expected SortMergeJoin in plan but got: {plan_str}" + ); + + // Start profiling after table creation + let _profiler = dhat::Profiler::builder().testing().build(); + + let df = ctx + .sql("SELECT t1.id, t1.val, t2.val FROM t1 JOIN t2 ON t1.id = t2.id") + .await + .unwrap(); + let _batches = df.collect().await.unwrap(); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "sort_merge_join: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + dhat::assert!( + stats.max_bytes < limit, + "Peak heap {} exceeded {}", + stats.max_bytes, + limit + ); +} diff --git a/datafusion/core/tests/heap_profile_window.rs b/datafusion/core/tests/heap_profile_window.rs new file mode 100644 index 0000000000000..ce0178439c562 --- /dev/null +++ b/datafusion/core/tests/heap_profile_window.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Heap profiling test for WindowAggExec. +//! Data exceeds memory pool to exercise the sort spilling +//! that feeds the window function. + +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +use std::sync::Arc; + +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_execution::memory_pool::FairSpillPool; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +const MEMORY_LIMIT: usize = 10 * 1024 * 1024; // 10MB + +#[tokio::test] +async fn heap_profile_window() { + let _profiler = dhat::Profiler::builder().testing().build(); + + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(FairSpillPool::new(MEMORY_LIMIT))) + .build_arc() + .unwrap(); + let config = SessionConfig::new() + .with_target_partitions(1) + .with_sort_spill_reservation_bytes(3 * 1024 * 1024); + let ctx = SessionContext::new_with_config_rt(config, runtime); + + // 2M rows (~15MB) forces sort to spill before window eval + let df = ctx + .sql( + "SELECT v, SUM(v) OVER (ORDER BY v ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) \ + FROM generate_series(1, 2000000) AS t(v)", + ) + .await + .unwrap(); + let batches = df.collect().await.unwrap(); + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(row_count, 2_000_000); + + let stats = dhat::HeapStats::get(); + let limit = (MEMORY_LIMIT as f64 * 1.1) as usize; + println!( + "window: max_bytes={}, memory_limit={}, ratio={:.2}x", + stats.max_bytes, + MEMORY_LIMIT, + stats.max_bytes as f64 / MEMORY_LIMIT as f64 + ); + // TODO: peak is ~33MB (3.3x pool) because generate_series input + // data and window output arrays are not tracked by the MemoryPool. + // dhat::assert!(stats.max_bytes < limit, + // "Peak heap {} exceeded {}", stats.max_bytes, limit); + let _ = limit; +}