Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_SHUFFLE_DIRECT_READ_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.directRead.enabled")
.category(CATEGORY_EXEC)
.doc(
"When enabled, native operators that consume shuffle output will read " +
"compressed shuffle blocks directly in native code, bypassing Arrow FFI. " +
"Only applies to native shuffle (not JVM columnar shuffle). " +
"Requires spark.comet.exec.shuffle.enabled to be true.")
.booleanConf
.createWithDefault(true)

val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
.category(CATEGORY_SHUFFLE)
.doc(
Expand Down
14 changes: 11 additions & 3 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use tokio::sync::mpsc;
use crate::execution::memory_pools::{
create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig,
};
use crate::execution::operators::ScanExec;
use crate::execution::operators::{ScanExec, ShuffleScanExec};
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
use crate::execution::spark_plan::SparkPlan;

Expand Down Expand Up @@ -151,6 +151,8 @@ struct ExecutionContext {
pub root_op: Option<Arc<SparkPlan>>,
/// The input sources for the DataFusion plan
pub scans: Vec<ScanExec>,
/// The shuffle scan input sources for the DataFusion plan
pub shuffle_scans: Vec<ShuffleScanExec>,
/// The global reference of input sources for the DataFusion plan
pub input_sources: Vec<Arc<GlobalRef>>,
/// The record batch stream to pull results from
Expand Down Expand Up @@ -311,6 +313,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
partition_count: partition_count as usize,
root_op: None,
scans: vec![],
shuffle_scans: vec![],
input_sources,
stream: None,
batch_receiver: None,
Expand Down Expand Up @@ -491,6 +494,10 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
exec_context.scans.iter_mut().try_for_each(|scan| {
scan.get_next_batch()?;
Ok::<(), CometError>(())
})?;
exec_context.shuffle_scans.iter_mut().try_for_each(|scan| {
scan.get_next_batch()?;
Ok::<(), CometError>(())
})
}

Expand Down Expand Up @@ -539,7 +546,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
let (scans, shuffle_scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
exec_context.partition_count,
Expand All @@ -548,6 +555,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

exec_context.plan_creation_time += physical_plan_time;
exec_context.scans = scans;
exec_context.shuffle_scans = shuffle_scans;

if exec_context.explain_native {
let formatted_plan_str =
Expand All @@ -560,7 +568,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// so we should always execute partition 0.
let stream = root_op.native_plan.execute(0, task_ctx)?;

if exec_context.scans.is_empty() {
if exec_context.scans.is_empty() && exec_context.shuffle_scans.is_empty() {
// No JVM data sources — spawn onto tokio so the executor
// thread parks in blocking_recv instead of busy-polling.
//
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ pub use parquet_writer::ParquetWriterExec;
mod csv_scan;
pub mod projection;
mod scan;
mod shuffle_scan;
pub use csv_scan::init_csv_datasource_exec;
pub use shuffle_scan::ShuffleScanExec;

/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
Expand Down
9 changes: 5 additions & 4 deletions native/core/src/execution/operators/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use jni::objects::GlobalRef;

use crate::{
execution::{
operators::{ExecutionError, ScanExec},
planner::{operator_registry::OperatorBuilder, PhysicalPlanner},
planner::{operator_registry::OperatorBuilder, PhysicalPlanner, PlanCreationResult},
spark_plan::SparkPlan,
},
extract_op,
Expand All @@ -42,12 +41,13 @@ impl OperatorBuilder for ProjectionBuilder {
inputs: &mut Vec<Arc<GlobalRef>>,
partition_count: usize,
planner: &PhysicalPlanner,
) -> Result<(Vec<ScanExec>, Arc<SparkPlan>), ExecutionError> {
) -> PlanCreationResult {
let project = extract_op!(spark_plan, Projection);
let children = &spark_plan.children;

assert_eq!(children.len(), 1);
let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?;
let (scans, shuffle_scans, child) =
planner.create_plan(&children[0], inputs, partition_count)?;

// Create projection expressions
let exprs: Result<Vec<_>, _> = project
Expand All @@ -68,6 +68,7 @@ impl OperatorBuilder for ProjectionBuilder {

Ok((
scans,
shuffle_scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])),
))
}
Expand Down
Loading
Loading