Skip to content

feat: support native Comet scan of plain Delta Lake tables#4669

Open
adityavaish wants to merge 2 commits into
apache:mainfrom
adityavaish:feat/native-delta-scan-plain-tables
Open

feat: support native Comet scan of plain Delta Lake tables#4669
adityavaish wants to merge 2 commits into
apache:mainfrom
adityavaish:feat/native-delta-scan-plain-tables

Conversation

@adityavaish

@adityavaish adityavaish commented Jun 17, 2026

Copy link
Copy Markdown

Which issue does this PR close?

Part of #174 (Explore integration with Delta Lake). It does not close #174 — that issue tracks broader Delta integration (writes, deletion vectors, column mapping, full native scan); this PR adds the minimal read-only piece.

Prior art / related work:

Rationale for this change

A Delta table that uses neither deletion vectors nor column mapping is just plain Parquet on disk, read through Spark's FileSourceScanExec with DeltaParquetFileFormat (a subclass of ParquetFileFormat). Today Comet rejects it because CometScanExec.isFileFormatSupported requires the exact ParquetFileFormat class, so these scans run entirely in Spark even though Comet's native Parquet reader could read the files unchanged.

On a scan-heavy micro-benchmark (20M-row plain Delta table, filter(...).agg(sum, sum, sum), 5 iterations after warmup, local[5]), enabling the native scan was ~12.5x faster than Spark (~600 ms vs ~7.5 s end-to-end), and ~41x faster than the existing spark.comet.convert.parquet.enabled path (which adds per-row Arrow conversion and is actually slower than Spark for scan-bound queries). Numbers are from a single dev box and a favorable query shape — real-world gains will be more modest.

What changes are included in this PR?

  • New config spark.comet.scan.delta.enabled (default false, experimental).
  • CometScanExec.isFileFormatSupported accepts DeltaParquetFileFormat (matched by class name, no compile-time dependency on delta-spark) when the flag is enabled.
  • CometScanRule routes a Delta FileSourceScanExec through the existing native Parquet scan path, with conservative fallback guards:
    • Column mapping — detected via DeltaParquetFileFormat.columnMappingMode (reflection). Delta strips column-mapping metadata from the schema it exposes to the scan, so the file format object is the only reliable signal.
    • Deletion vectors — detected via Delta's synthetic __delta_internal_* scan columns.
    • On any reflection failure the table is treated as unsupported (fall back, never guess).
  • CometDeltaReadSuite + a per-Spark-profile delta test dependency wired through delta.version / delta.artifact.name properties (delta-spark for Delta ≥ 3.0, delta-core for Delta < 3.0).

Deletion vectors, column mapping, and native Delta writes are intentionally out of scope and continue to run in Spark.

How are these changes tested?

New CometDeltaReadSuite, validated on the default build (Spark 4.1.2 + Delta 4.1.0), all green:

CometDeltaReadSuite:
  + tier0 plain plan nodes:        CometNativeColumnarToRowExec, CometNativeScanExec
  + tier0 partitioned plan nodes:  CometNativeColumnarToRowExec, CometProjectExec, CometNativeScanExec
  + tier0 deletion-vectors nodes:  ... FileSourceScanExec   (falls back to Spark)
  + tier0 column-mapping nodes:    ... FileSourceScanExec   (falls back to Spark)
Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0

Coverage:

  • Plain and partitioned reads now use CometNativeScanExec; results verified equal to Spark via checkSparkAnswer.
  • Deletion-vector and column-mapping tables correctly fall back to Spark (asserted) and still return correct results.
  • Read-analysis cases (convert off/on, downstream aggregate) documenting how a Delta scan flows through Comet.
  • Feature-specific tests self-cancel on Delta versions that lack columnMappingMode / deletion vectors, so the suite stays green across the Spark/Delta build matrix.

No impact on non-Delta scans — the new branch is gated on a default-false flag and only runs for Delta FileSourceScanExec. Regression checks:

  • CometScanRuleSuite + CometExecRuleSuite: 16 passed
  • CometNativeReaderSuite: 51 passed (1 canceled)
  • cargo test (native): 634 passed

Performance

Micro-benchmark of a scan-heavy query against a plain (no DV / no column mapping) Delta table, to compare the scan source while holding everything else constant.

  • Table: 20,000,000 rows, columns id long, g long, k int, v double, v2 double, v3 double, written once as Delta.
  • Query: spark.read.format("delta").load(path).filter("k > 10").agg(sum("v"), sum("v2"), sum("v3"))
  • Method: 1 warm-up iteration, then 5 measured iterations (avg + min); local[5], release native build, AQE off. No file/stat pruning is triggered (the filter is on a high-cardinality-within-file column), so all 20M rows are scanned in every configuration — an apples-to-apples comparison.
Configuration Scan path Avg Min
Spark (spark.comet.enabled=false) Spark Delta FileSourceScanExec → Spark agg 7,488 ms 7,416 ms
Comet convert-to-Arrow (spark.comet.convert.parquet.enabled=true) Spark Delta scan → CometSparkToColumnarExec → Comet agg 24,531 ms 23,499 ms
Comet native (this PR, spark.comet.scan.delta.enabled=true) CometNativeScanExec → Comet agg 598 ms 580 ms

Speedups:

  • Native vs Spark: ~12.5×
  • Native vs convert-to-Arrow: ~41× — the cleanest scan-isolation number, since both use the identical CometHashAggregate downstream, so only the scan source differs.
  • Convert-to-Arrow vs Spark: 0.31× (i.e. ~3× slower) — converting every scanned row to Arrow is pure overhead for a scan-bound query, which is why a dedicated native scan (rather than the existing convert path) is worthwhile.

Caveats: this is a single-box micro-benchmark on a favorable, scan-dominated query with a warm OS cache and a release build that retains debug symbols. It overstates the gap versus real workloads — Comet's headline is ~2× end-to-end on TPC-DS. Treat these as directional, not official numbers.

AI Disclosure

Disclosed for transparency, per Apache Software Foundation generative-AI guidance.

  • Parts of this pull request were generated or assisted by an AI tool.
    • AI tool used: GitHub Copilot CLI (Anthropic Claude model).
    • Areas assisted: the code change (CometConf, CometScanExec, CometScanRule), the CometDeltaReadSuite tests, the per–Spark-profile pom.xml wiring, and this PR description.
    • Human oversight & responsibility: All changes were reviewed by the contributor, who takes responsibility for this PR and confirms the right to submit it under the Apache License 2.0. The implementation deliberately reuses Comet's existing native Parquet scan and mirrors established in-repo patterns (e.g. reflection-based file-format detection, as in IcebergReflection), and introduces no third-party copyrighted material. The full suite was built and run locally with the results reported above.

Delta tables that store plain Parquet (no deletion vectors and no column
mapping) are read through Spark's built-in Parquet machinery, so Comet's
existing native Parquet reader can scan them directly with no new native
code or dependencies.

Behind a new `spark.comet.scan.delta.enabled` flag (default false),
CometScanRule now routes such Delta scans to the native Parquet scan, and
conservatively falls back to Spark when a table uses column mapping
(detected via DeltaParquetFileFormat.columnMappingMode) or deletion vectors
(detected via Delta's synthetic __delta_internal_* columns) to avoid
returning incorrect results.

Part of apache#174.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@adityavaish

Copy link
Copy Markdown
Author

Friendly review request for the repo maintainers — this is a small, flag-gated (spark.comet.scan.delta.enabled, default off) change that reuses Comet's existing native Parquet reader for plain Delta tables, with conservative fallback for deletion vectors / column mapping. It revives the approach from #3035 and is part of #174.

@andygrove @comphead @mbutrovich — would one of you be able to take a look (or suggest a more appropriate reviewer)? Thanks!

(Note: I don't have permission to set the Reviewers field as an external contributor, hence this comment.)

@coderfender

Copy link
Copy Markdown
Contributor

Thank you for the PR @adityavaish . @schenksj has been the primary driver behind delta support . Probably worth taking out their input/ review

The Preflight 'Check missing suites' guard (dev/ci/check-suites.py) requires every *Suite.scala to be listed in both pr_build_linux.yml and pr_build_macos.yml. Add CometDeltaReadSuite to the 'scans' group in both.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@adityavaish

Copy link
Copy Markdown
Author

@schenksj would you be willing to review this and share your thoughts? You have the most context on native Delta in Comet (the delta-kernel-rs scan in #3932 and the contrib restructure in #4366), so your perspective would be very valuable.

This PR is deliberately a much smaller, complementary approach to yours: behind spark.comet.scan.delta.enabled (default off), it just reuses Comet's existing native Parquet reader for plain Delta tables (no deletion vectors, no column mapping), and conservatively falls back to Spark otherwise. It's essentially a revival of #3035. It is not meant to replace the full delta-kernel integration — rather to give the common case a native path today while the contrib module matures.

A few specific things I'd love your opinion on:

  1. Does an in-core, flag-gated Tier-0 like this make sense alongside the contrib delta-kernel work, or would you prefer all Delta support live in contrib/?
  2. The fallback guards: column mapping is detected via DeltaParquetFileFormat.columnMappingMode (reflection) and deletion vectors via the synthetic __delta_internal_* scan columns. Are there other Delta features that would make a plain native Parquet read incorrect that I should also guard against?
  3. The Delta test dependency is wired per Spark profile (delta-core 2.4.0 for 3.4; delta-spark 3.3.2/4.0.0/4.1.0/4.2.0 for 3.5/4.0/4.1/4.2). Does that match how you handle versioning in the contrib suite?

Thanks for taking a look!

@andygrove

Copy link
Copy Markdown
Member

Thanks for the careful writeup, the explicit framing of how this complements (rather than replaces) @schenksj's contrib work in #4366, and the AI-disclosure block. I appreciate the discipline of disclosing tooling per the ASF guidance and including the human-oversight statement.

I'll defer to @schenksj on the strategic question of whether an in-core Tier 0 path is the right shape alongside the contrib delta-kernel-rs work, since they have the most context. A few specific suggestions on the change itself:

  1. Fragile DV detection. The deletion-vector guard relies on the __delta_internal column-name prefix, which is a Delta implementation detail rather than a public contract. The Tier 0 DV test cancels itself when DELETE materializes as a copy-on-write rewrite instead of a deletion vector, so on some build-matrix slots there's no guaranteed coverage of the DV fallback path. Could you add a unit-level test that constructs a synthetic FileSourceScanExec with a __delta_internal_row_index column in its required schema and asserts the rule rejects it? That would pin the column-name contract independent of Delta version.

  2. Type widening (Delta 3.2+). A Delta table that has had a column widened stores the older type in Parquet and widens it on read. A native Parquet scan would return the narrower type, which would silently disagree with Spark. Worth either adding a guard or filing a tracking issue and noting it in the docstring's unsupported list. Same question for row tracking and the variant type in Delta 4.0+.

  3. Reflection catch is too wide. In deltaColumnMappingEnabled, case _: Throwable => true also swallows Error subclasses like OutOfMemoryError and LinkageError. The conservative-fallback policy is right, but case _: Exception (or ReflectiveOperationException then Exception) keeps that policy without masking VM-level errors.

  4. Both flags on. There's no test that pins down behavior when spark.comet.scan.delta.enabled=true and spark.comet.convert.parquet.enabled=true are both set. From reading the code the native scan rule runs first so the convert path doesn't kick in, but a one-line assertion would lock it down.

  5. Older Delta versions. The Tier 0 tests assume away builds where columnMappingMode reflection isn't available (Delta 2.x). If isFileFormatSupported refused to widen when that method can't be located, runtime behavior on older Delta would be unchanged and the test matrix wouldn't be doing the load-bearing version gating.

I'll approve the CI workflows so the matrix actually runs across all the Delta versions you've wired in. The cancelIfDeltaDmlSkew carve-out specifically suggests the Spark 4.1.2 + Delta 4.1.0 slot may have an interesting DML-skew interaction worth eyeballing in the logs once they exist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Explore integration with Delta Lake

4 participants