Optionally split the Iceberg write/commit operator into separate writer and committer operations#4658
Open
jordepic wants to merge 5 commits into
Open
Optionally split the Iceberg write/commit operator into separate writer and committer operations#4658jordepic wants to merge 5 commits into
jordepic wants to merge 5 commits into
Conversation
Contributor
Author
|
Let me know what you think @mbutrovich , @comphead ! |
added 5 commits
June 16, 2026 16:13
Iceberg's V2 writes run as a single `V2ExistingTableWriteExec` command that both
writes files and commits. Spark's `InsertAdaptiveSparkPlan` treats it as a leaf,
so AQE never sees the data sub-query inside it and Comet's columnar rules can't
convert the scans / shuffles / sorts that feed the write.
This injects `IcebergWriteStrategy` ahead of Spark's `DataSourceV2Strategy` (via
`experimentalMethods.extraStrategies`, which is prepended to the planner's
strategy list). For Iceberg `AppendData`, `OverwriteByExpression`, and
`OverwritePartitionsDynamic` it rewrites the single command into two operators:
IcebergCommitExec(batchWrite, refreshCache) <- committer (driver-side)
└── AdaptiveSparkPlanExec <- AQE bubble
└── IcebergWriteExec(batchWrite) <- writer (UnaryExecNode)
└── <data sub-query> <- now visible to AQE / Comet
The writer emits one row per Spark task carrying the Java-serialised
`WriterCommitMessage`; the committer collects them and calls
`BatchWrite.commit(messages)` -- the same call Iceberg-Java makes internally --
then runs Spark's post-write cache refresh.
Two design points keep this stable under AQE, which re-runs the planner on each
materialised stage's `logicalLink`:
- The writer's child is wrapped in `IcebergWriteLogical`, a stable logical
anchor, so each re-plan re-emits only the writer rather than vanishing it or
re-firing the surrounding logical write and duplicating the commit.
- The `BatchWrite` is materialised once (`Write.toBatch()` mints a fresh
instance per call) and shared between committer and writer, so commit-time
validation sees the same scan / emitted-file state the writer used.
`WriteDelta` (merge-on-read) is intentionally not intercepted: its per-task
`DeltaWriter` is row-dispatched and the native writer can't emit position-delete
files, so the split plan would add planning complexity for no acceleration.
Copy-on-write DELETE / UPDATE / MERGE (`ReplaceData`) is handled in a follow-up
commit.
Guarded by `spark.comet.write.iceberg.splitOperator.enabled` (off by default);
when off, writes go straight through Iceberg-Java unchanged. File writing still
runs through Iceberg's JVM writer -- native Parquet write lands in a later
commit.
Adds copy-on-write row-level DML to the split-operator plan. Spark lowers a CoW `DELETE` / `UPDATE` / `MERGE` on a V2 table into a `ReplaceData` logical node; `IcebergWriteStrategy` now matches it (refreshing `originalTable`'s cache) and routes it through the same committer + writer pair as appends/overwrites. The writer gains a per-row dispatch path. On Spark 4.x a `ReplaceData` rewrite prefixes each row with an operation code and carries a `ReplaceDataProjections`; `IcebergWriteExec` applies the row / metadata projections per code (WRITE / WRITE_WITH_METADATA) before handing rows to the `DataWriter`. The two-arg `DataWriter.write(metadata, row)` is looked up reflectively since it is 4.x-only. On Spark 3.4 / 3.5 the rewritten stream is already post-projection, so the shim returns `None` and the writer keeps its plain `write(row)` loop. `ReplaceDataProjections` does not exist on 3.4 / 3.5, so `ReplaceDataDispatchInfo` mirrors it as a version-neutral carrier, populated by the per-version `IcebergReplaceDataShim`. Iceberg 1.5.2 (Spark 3.4) lacks native row-level operations and instead routes UPDATE / MERGE through its own `ReplaceIcebergData` logical node. It has the same field shape as Spark's `ReplaceData` and is matched by FQCN via reflection, so the main module takes no compile dependency on iceberg-spark-extensions.
9a32b67 to
77030ec
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #4322.
Rationale for this change
Iceberg spark writes are V2 operators and contain the functionality for writing data files, metadata files, and committing to the catalog. Ultimately, Comet is only well-positioned to just accelerate data file writing (assuming they're parquet files). It is also crucial to ensure that the actual data file writing piece of the spark plan for iceberg writing is included within the AQE block of a spark plan, thereby ensuring that we re-plan writes in response to runtime decisions regarding its upstream operators.
Our split is fairly simple - we write the data files like normal in the "writer" operator, serialize its output, and pass it back to the "committer" operator. In the future, we'll target just the "writer" operator for speedup with iceberg-rust.
What changes are included in this PR?
This PR contains 5 commits.
How are these changes tested?
We have unit tests for each operator that we're replacing that ensures that the plan shape is correct, we commit to our iceberg table the proper number of times, and our iceberg table end state is correct when we scan it after a write operation. I've been running with these changes locally now and they're all performing as expected as well.