Skip to content

Optionally split the Iceberg write/commit operator into separate writer and committer operations#4658

Open
jordepic wants to merge 5 commits into
apache:mainfrom
jordepic:iceberg-writes-split-c1
Open

Optionally split the Iceberg write/commit operator into separate writer and committer operations#4658
jordepic wants to merge 5 commits into
apache:mainfrom
jordepic:iceberg-writes-split-c1

Conversation

@jordepic

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4322.

Rationale for this change

Iceberg spark writes are V2 operators and contain the functionality for writing data files, metadata files, and committing to the catalog. Ultimately, Comet is only well-positioned to just accelerate data file writing (assuming they're parquet files). It is also crucial to ensure that the actual data file writing piece of the spark plan for iceberg writing is included within the AQE block of a spark plan, thereby ensuring that we re-plan writes in response to runtime decisions regarding its upstream operators.

Our split is fairly simple - we write the data files like normal in the "writer" operator, serialize its output, and pass it back to the "committer" operator. In the future, we'll target just the "writer" operator for speedup with iceberg-rust.

What changes are included in this PR?

This PR contains 5 commits.

  1. Docs outlining the WHOLE iceberg-write acceleration feature, not just these changes (I'm happy to modify/remove as needed).
  2. Planning rules to move iceberg append and overwrite operations to our "split operator" design.
  3. Planning rules to move iceberg delete, update, and merge operations to our "split operator" design.
  4. Tests for part 2
  5. Tests for part 3

How are these changes tested?

We have unit tests for each operator that we're replacing that ensures that the plan shape is correct, we commit to our iceberg table the proper number of times, and our iceberg table end state is correct when we scan it after a write operation. I've been running with these changes locally now and they're all performing as expected as well.

@jordepic

Copy link
Copy Markdown
Contributor Author

Let me know what you think @mbutrovich , @comphead !

Jordan Epstein added 5 commits June 16, 2026 16:13
Iceberg's V2 writes run as a single `V2ExistingTableWriteExec` command that both
writes files and commits. Spark's `InsertAdaptiveSparkPlan` treats it as a leaf,
so AQE never sees the data sub-query inside it and Comet's columnar rules can't
convert the scans / shuffles / sorts that feed the write.

This injects `IcebergWriteStrategy` ahead of Spark's `DataSourceV2Strategy` (via
`experimentalMethods.extraStrategies`, which is prepended to the planner's
strategy list). For Iceberg `AppendData`, `OverwriteByExpression`, and
`OverwritePartitionsDynamic` it rewrites the single command into two operators:

  IcebergCommitExec(batchWrite, refreshCache)   <- committer (driver-side)
  └── AdaptiveSparkPlanExec                       <- AQE bubble
      └── IcebergWriteExec(batchWrite)            <- writer (UnaryExecNode)
          └── <data sub-query>                    <- now visible to AQE / Comet

The writer emits one row per Spark task carrying the Java-serialised
`WriterCommitMessage`; the committer collects them and calls
`BatchWrite.commit(messages)` -- the same call Iceberg-Java makes internally --
then runs Spark's post-write cache refresh.

Two design points keep this stable under AQE, which re-runs the planner on each
materialised stage's `logicalLink`:

- The writer's child is wrapped in `IcebergWriteLogical`, a stable logical
  anchor, so each re-plan re-emits only the writer rather than vanishing it or
  re-firing the surrounding logical write and duplicating the commit.
- The `BatchWrite` is materialised once (`Write.toBatch()` mints a fresh
  instance per call) and shared between committer and writer, so commit-time
  validation sees the same scan / emitted-file state the writer used.

`WriteDelta` (merge-on-read) is intentionally not intercepted: its per-task
`DeltaWriter` is row-dispatched and the native writer can't emit position-delete
files, so the split plan would add planning complexity for no acceleration.
Copy-on-write DELETE / UPDATE / MERGE (`ReplaceData`) is handled in a follow-up
commit.

Guarded by `spark.comet.write.iceberg.splitOperator.enabled` (off by default);
when off, writes go straight through Iceberg-Java unchanged. File writing still
runs through Iceberg's JVM writer -- native Parquet write lands in a later
commit.
Adds copy-on-write row-level DML to the split-operator plan. Spark lowers a CoW
`DELETE` / `UPDATE` / `MERGE` on a V2 table into a `ReplaceData` logical node;
`IcebergWriteStrategy` now matches it (refreshing `originalTable`'s cache) and
routes it through the same committer + writer pair as appends/overwrites.

The writer gains a per-row dispatch path. On Spark 4.x a `ReplaceData` rewrite
prefixes each row with an operation code and carries a `ReplaceDataProjections`;
`IcebergWriteExec` applies the row / metadata projections per code (WRITE /
WRITE_WITH_METADATA) before handing rows to the `DataWriter`. The two-arg
`DataWriter.write(metadata, row)` is looked up reflectively since it is 4.x-only.
On Spark 3.4 / 3.5 the rewritten stream is already post-projection, so the shim
returns `None` and the writer keeps its plain `write(row)` loop.

`ReplaceDataProjections` does not exist on 3.4 / 3.5, so `ReplaceDataDispatchInfo`
mirrors it as a version-neutral carrier, populated by the per-version
`IcebergReplaceDataShim`.

Iceberg 1.5.2 (Spark 3.4) lacks native row-level operations and instead routes
UPDATE / MERGE through its own `ReplaceIcebergData` logical node. It has the same
field shape as Spark's `ReplaceData` and is matched by FQCN via reflection, so
the main module takes no compile dependency on iceberg-spark-extensions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Writes to Apache Iceberg Tables

1 participant