Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("localTableScan", defaultValue = false)

val COMET_EXEC_GRACE_HASH_JOIN_NUM_PARTITIONS: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.numPartitions")
.category(CATEGORY_EXEC)
.doc("The number of partitions (buckets) to use for Grace Hash Join. A higher number " +
"reduces the size of each partition but increases overhead.")
.intConf
.checkValue(v => v > 0, "The number of partitions must be positive.")
.createWithDefault(16)

val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
.category(CATEGORY_EXEC)
.doc(
"Per-task memory budget in bytes for Grace Hash Join fast-path hash tables. " +
"When a build side fits in memory and is smaller than this threshold, " +
"the join executes as a single HashJoinExec without partitioning or spilling. " +
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
"creates non-spillable hash tables.")
.longConf
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
.createWithDefault(64L * 1024 * 1024) // 64 MB

val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
.category(CATEGORY_EXEC)
Expand Down Expand Up @@ -381,6 +403,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REPLACE_SMJ_MAX_BUILD_SIZE: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin.maxBuildSize")
.category(CATEGORY_EXEC)
.doc(
"Maximum estimated size in bytes of the build side for replacing SortMergeJoin " +
"with ShuffledHashJoin. When the build side's logical plan statistics exceed this " +
"threshold, the SortMergeJoin is kept because sort-merge join's streaming merge " +
"on pre-sorted data outperforms hash join's per-task hash table construction " +
"for large build sides. Set to -1 to disable this check and always replace.")
.longConf
.createWithDefault(100L * 1024 * 1024) // 100 MB

val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
.category(CATEGORY_SHUFFLE)
Expand Down
293 changes: 293 additions & 0 deletions docs/source/contributor-guide/grace-hash-join-design.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz", "ipc_compression"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
Expand Down
1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ include = [
publish = false

[dependencies]
ahash = "0.8"
arrow = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental", "arrow"] }
futures = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ struct ExecutionContext {
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
/// Spark configuration map for comet-specific settings
pub spark_conf: HashMap<String, String>,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -322,6 +324,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
explain_native,
memory_pool_config,
tracing_enabled,
spark_conf: spark_config,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -535,7 +538,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let start = Instant::now();
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
.with_exec_id(exec_context_id);
.with_exec_id(exec_context_id)
.with_spark_conf(exec_context.spark_conf.clone());
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down
Loading