feat: support native Comet scan of plain Delta Lake tables#4669
feat: support native Comet scan of plain Delta Lake tables#4669adityavaish wants to merge 2 commits into
Conversation
Delta tables that store plain Parquet (no deletion vectors and no column mapping) are read through Spark's built-in Parquet machinery, so Comet's existing native Parquet reader can scan them directly with no new native code or dependencies. Behind a new `spark.comet.scan.delta.enabled` flag (default false), CometScanRule now routes such Delta scans to the native Parquet scan, and conservatively falls back to Spark when a table uses column mapping (detected via DeltaParquetFileFormat.columnMappingMode) or deletion vectors (detected via Delta's synthetic __delta_internal_* columns) to avoid returning incorrect results. Part of apache#174. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Friendly review request for the repo maintainers — this is a small, flag-gated ( @andygrove @comphead @mbutrovich — would one of you be able to take a look (or suggest a more appropriate reviewer)? Thanks! (Note: I don't have permission to set the Reviewers field as an external contributor, hence this comment.) |
|
Thank you for the PR @adityavaish . @schenksj has been the primary driver behind delta support . Probably worth taking out their input/ review |
The Preflight 'Check missing suites' guard (dev/ci/check-suites.py) requires every *Suite.scala to be listed in both pr_build_linux.yml and pr_build_macos.yml. Add CometDeltaReadSuite to the 'scans' group in both. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
@schenksj would you be willing to review this and share your thoughts? You have the most context on native Delta in Comet (the delta-kernel-rs scan in #3932 and the contrib restructure in #4366), so your perspective would be very valuable. This PR is deliberately a much smaller, complementary approach to yours: behind A few specific things I'd love your opinion on:
Thanks for taking a look! |
|
Thanks for the careful writeup, the explicit framing of how this complements (rather than replaces) @schenksj's contrib work in #4366, and the AI-disclosure block. I appreciate the discipline of disclosing tooling per the ASF guidance and including the human-oversight statement. I'll defer to @schenksj on the strategic question of whether an in-core Tier 0 path is the right shape alongside the contrib
I'll approve the CI workflows so the matrix actually runs across all the Delta versions you've wired in. The |
Which issue does this PR close?
Part of #174 (Explore integration with Delta Lake). It does not close #174 — that issue tracks broader Delta integration (writes, deletion vectors, column mapping, full native scan); this PR adds the minimal read-only piece.
Prior art / related work:
delta-kernel-rsnative scan explored in feat: Native Delta Lake scan via delta-kernel-rs #3932 (closed in favor of a contrib module) and [Tracking] feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib) #4366 (draftcontrib/tracking PR). This PR adds no native Rust code and no new runtime dependency — it reuses Comet's existing native Parquet reader.Rationale for this change
A Delta table that uses neither deletion vectors nor column mapping is just plain Parquet on disk, read through Spark's
FileSourceScanExecwithDeltaParquetFileFormat(a subclass ofParquetFileFormat). Today Comet rejects it becauseCometScanExec.isFileFormatSupportedrequires the exactParquetFileFormatclass, so these scans run entirely in Spark even though Comet's native Parquet reader could read the files unchanged.On a scan-heavy micro-benchmark (20M-row plain Delta table,
filter(...).agg(sum, sum, sum), 5 iterations after warmup,local[5]), enabling the native scan was ~12.5x faster than Spark (~600 ms vs ~7.5 s end-to-end), and ~41x faster than the existingspark.comet.convert.parquet.enabledpath (which adds per-row Arrow conversion and is actually slower than Spark for scan-bound queries). Numbers are from a single dev box and a favorable query shape — real-world gains will be more modest.What changes are included in this PR?
spark.comet.scan.delta.enabled(defaultfalse, experimental).CometScanExec.isFileFormatSupportedacceptsDeltaParquetFileFormat(matched by class name, no compile-time dependency on delta-spark) when the flag is enabled.CometScanRuleroutes a DeltaFileSourceScanExecthrough the existing native Parquet scan path, with conservative fallback guards:DeltaParquetFileFormat.columnMappingMode(reflection). Delta strips column-mapping metadata from the schema it exposes to the scan, so the file format object is the only reliable signal.__delta_internal_*scan columns.CometDeltaReadSuite+ a per-Spark-profiledeltatest dependency wired throughdelta.version/delta.artifact.nameproperties (delta-sparkfor Delta ≥ 3.0,delta-corefor Delta < 3.0).Deletion vectors, column mapping, and native Delta writes are intentionally out of scope and continue to run in Spark.
How are these changes tested?
New
CometDeltaReadSuite, validated on the default build (Spark 4.1.2 + Delta 4.1.0), all green:Coverage:
CometNativeScanExec; results verified equal to Spark viacheckSparkAnswer.columnMappingMode/ deletion vectors, so the suite stays green across the Spark/Delta build matrix.No impact on non-Delta scans — the new branch is gated on a default-
falseflag and only runs for DeltaFileSourceScanExec. Regression checks:CometScanRuleSuite+CometExecRuleSuite: 16 passedCometNativeReaderSuite: 51 passed (1 canceled)cargo test(native): 634 passedPerformance
Micro-benchmark of a scan-heavy query against a plain (no DV / no column mapping) Delta table, to compare the scan source while holding everything else constant.
id long, g long, k int, v double, v2 double, v3 double, written once as Delta.spark.read.format("delta").load(path).filter("k > 10").agg(sum("v"), sum("v2"), sum("v3"))local[5], release native build, AQE off. No file/stat pruning is triggered (the filter is on a high-cardinality-within-file column), so all 20M rows are scanned in every configuration — an apples-to-apples comparison.spark.comet.enabled=false)FileSourceScanExec→ Spark aggspark.comet.convert.parquet.enabled=true)CometSparkToColumnarExec→ Comet aggspark.comet.scan.delta.enabled=true)CometNativeScanExec→ Comet aggSpeedups:
CometHashAggregatedownstream, so only the scan source differs.Caveats: this is a single-box micro-benchmark on a favorable, scan-dominated query with a warm OS cache and a release build that retains debug symbols. It overstates the gap versus real workloads — Comet's headline is ~2× end-to-end on TPC-DS. Treat these as directional, not official numbers.
AI Disclosure
Disclosed for transparency, per Apache Software Foundation generative-AI guidance.
CometConf,CometScanExec,CometScanRule), theCometDeltaReadSuitetests, the per–Spark-profilepom.xmlwiring, and this PR description.IcebergReflection), and introduces no third-party copyrighted material. The full suite was built and run locally with the results reported above.