Skip to content
Closed

[bench] #20764

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::stream::Stream;
use futures::future::join_all;
use futures::{FutureExt, StreamExt, TryStreamExt, ready};
use log::trace;
use parking_lot::Mutex;
Expand Down Expand Up @@ -1469,31 +1470,40 @@ impl RepartitionExec {
Err(e) => {
let e = Arc::new(e);

for (_, tx) in txs {
let err = Err(DataFusionError::Context(
"Join Error".to_string(),
Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))),
));
tx.send(Some(err)).await.ok();
}
join_all(txs.into_values().map(|tx| {
let e = Arc::clone(&e);
async move {
let err = Err(DataFusionError::Context(
"Join Error".to_string(),
Box::new(DataFusionError::External(Box::new(e))),
));
tx.send(Some(err)).await.ok();
}
}))
.await;
}
// Error from running input task
Ok(Err(e)) => {
// send the same Arc'd error to all output partitions
let e = Arc::new(e);

for (_, tx) in txs {
// wrap it because need to send error to all output partitions
let err = Err(DataFusionError::from(&e));
tx.send(Some(err)).await.ok();
}
join_all(txs.into_values().map(|tx| {
let e = Arc::clone(&e);
async move {
let err = Err(DataFusionError::from(&e));
tx.send(Some(err)).await.ok();
}
}))
.await;
}
// Input task completed successfully
Ok(Ok(())) => {
// notify each output partition that this input partition has no more data
for (_partition, tx) in txs {
tx.send(None).await.ok();
}
join_all(
txs.into_values()
.map(|tx| async move { tx.send(None).await.ok() }),
)
.await;
}
}
}
Expand Down
Loading