Skip to content

Commit 854e3b2

Browse files
authored
Core, Spark 4.1: Fix distributed planning for CoW operations (#15246)
1 parent ee97581 commit 854e3b2

4 files changed

Lines changed: 13 additions & 9 deletions

File tree

core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ private DeleteFileIndex planDeletesLocally(List<ManifestFile> deleteManifests) {
299299
builder.planWith(planExecutor());
300300
}
301301

302+
if (shouldIgnoreResiduals()) {
303+
builder.ignoreResiduals();
304+
}
305+
302306
return builder
303307
.specsById(specs())
304308
.filterData(filter())

spark/v4.1/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.stream.Collectors;
2727
import java.util.stream.IntStream;
2828
import org.apache.iceberg.expressions.Expression;
29+
import org.apache.iceberg.expressions.Expressions;
2930
import org.apache.iceberg.io.CloseableIterable;
3031
import org.apache.iceberg.io.ClosingIterator;
3132
import org.apache.iceberg.io.FileIO;
@@ -253,7 +254,7 @@ private static class ReadDeleteManifest implements FlatMapFunction<ManifestFileB
253254

254255
ReadDeleteManifest(Broadcast<Table> table, TableScanContext context) {
255256
this.table = table;
256-
this.filter = context.rowFilter();
257+
this.filter = context.ignoreResiduals() ? Expressions.alwaysTrue() : context.rowFilter();
257258
this.isCaseSensitive = context.caseSensitive();
258259
}
259260

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,8 +693,7 @@ public Scan buildCopyOnWriteScan() {
693693
Schema expectedSchema = schemaWithMetadataColumns();
694694

695695
BatchScan scan =
696-
table
697-
.newBatchScan()
696+
newBatchScan()
698697
.useSnapshot(snapshot.snapshotId())
699698
.ignoreResiduals()
700699
.caseSensitive(caseSensitive)

spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,12 +302,12 @@ public void limitPushedDownToSparkScan() {
302302
.isEqualTo(limit);
303303

304304
// verify CoW scan
305-
assertThat(builder.buildCopyOnWriteScan())
306-
.extracting("scan")
307-
.extracting("scan")
308-
.extracting("context")
309-
.extracting("minRowsRequested")
310-
.isEqualTo(limit);
305+
scanAssert = assertThat(builder.buildCopyOnWriteScan()).extracting("scan");
306+
if (LOCAL == planningMode) {
307+
scanAssert = scanAssert.extracting("scan");
308+
}
309+
310+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
311311

312312
// verify MoR scan
313313
scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");

0 commit comments

Comments
 (0)