-
Notifications
You must be signed in to change notification settings - Fork 512
Add PuffinWriter for writing deletion vectors #3474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
moomindani
wants to merge
14
commits into
apache:main
Choose a base branch
from
moomindani:moomindani/dv-write-revival
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+309
−2
Open
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
755793c
deletion vector write
rambleraptor 9b10a4f
test fix
rambleraptor c90ad38
lint fixes
rambleraptor 842d6a5
test: Add Spark interop test for Puffin DV reader
glesperance 9524618
PR comments
rambleraptor e23a67d
lint
rambleraptor 72ebba8
Test: lock in agreed DV field id and blob framing
moomindani 4ecfd18
Address review comments
moomindani eb81422
Accept an OutputFile in PuffinWriter and write the file in finish()
moomindani a6d2f31
Add unit tests for sparse bitmap keys and the Java key range limit
moomindani a979e55
Use pyiceberg.__version__ for PuffinWriter created-by default
moomindani be344a6
Apply ruff format to PuffinWriter created-by line
moomindani 16ce42c
Merge remote-tracking branch 'upstream/main' into moomindani/dv-write…
moomindani 2cee6a3
Rework writer onto #3491: DeletionVector.to_blob + generic PuffinWriter
moomindani File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
|
|
||
| from pyiceberg import __version__ | ||
| from pyiceberg.io.pyarrow import PyArrowFileIO | ||
| from pyiceberg.table.deletion_vector import DeletionVector, deletion_vectors_from_puffin_file | ||
| from pyiceberg.table.puffin import MAGIC_BYTES, PuffinFile, PuffinWriter | ||
|
|
||
|
|
||
| def _write(tmp_path: Path, *deletion_vectors: DeletionVector, created_by: str | None = None) -> Path: | ||
| puffin_path = tmp_path / "test.puffin" | ||
| with PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by) as writer: | ||
| for dv in deletion_vectors: | ||
| writer.add_blob(dv.to_blob()) | ||
| return puffin_path | ||
|
|
||
|
|
||
| def test_puffin_writer_round_trips_single_blob(tmp_path: Path) -> None: | ||
| positions = [0, 1, 5, (1 << 32) + 7] | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", positions)) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| dvs = deletion_vectors_from_puffin_file(reader) | ||
|
|
||
| assert len(dvs) == 1 | ||
| assert dvs[0].referenced_data_file == "file.parquet" | ||
| assert dvs[0].to_vector().to_pylist() == sorted(positions) | ||
|
|
||
|
|
||
| def test_puffin_writer_round_trips_multiple_blobs(tmp_path: Path) -> None: | ||
| puffin_path = _write( | ||
| tmp_path, | ||
| DeletionVector.from_positions("file1.parquet", [1, 2, 3]), | ||
| DeletionVector.from_positions("file2.parquet", [4, 5, 6]), | ||
| ) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| dvs = deletion_vectors_from_puffin_file(reader) | ||
|
|
||
| assert {dv.referenced_data_file: dv.to_vector().to_pylist() for dv in dvs} == { | ||
| "file1.parquet": [1, 2, 3], | ||
| "file2.parquet": [4, 5, 6], | ||
| } | ||
|
|
||
|
|
||
| def test_puffin_writer_writes_magic_bytes_and_offsets(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1, 2, 3])) | ||
| puffin_bytes = puffin_path.read_bytes() | ||
|
|
||
| assert puffin_bytes[:4] == MAGIC_BYTES | ||
| assert puffin_bytes[-4:] == MAGIC_BYTES | ||
|
|
||
| blob = PuffinFile(puffin_bytes).footer.blobs[0] | ||
| # PuffinWriter fills in the placeholder offset and length while assembling the file | ||
| assert blob.offset > 0 | ||
| assert blob.length > 0 | ||
|
|
||
|
|
||
| def test_puffin_writer_default_created_by(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1])) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.properties["created-by"] == f"PyIceberg version {__version__}" | ||
|
|
||
|
|
||
| def test_puffin_writer_custom_created_by(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1]), created_by="my-test-app") | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.properties["created-by"] == "my-test-app" | ||
|
|
||
|
|
||
| def test_puffin_writer_file_size_via_output_file(tmp_path: Path) -> None: | ||
| puffin_path = tmp_path / "test.puffin" | ||
| output_file = PyArrowFileIO().new_output(str(puffin_path)) | ||
| with PuffinWriter(output_file) as writer: | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [1, 2, 3]).to_blob()) | ||
|
|
||
| assert len(output_file) == len(puffin_path.read_bytes()) | ||
|
|
||
|
|
||
| def test_puffin_writer_empty(tmp_path: Path) -> None: | ||
| puffin_path = _write(tmp_path) | ||
|
|
||
| reader = PuffinFile(puffin_path.read_bytes()) | ||
| assert reader.footer.blobs == [] | ||
| assert deletion_vectors_from_puffin_file(reader) == [] | ||
|
|
||
|
|
||
| def test_add_blob_to_closed_writer_raises(tmp_path: Path) -> None: | ||
| output_file = PyArrowFileIO().new_output(str(tmp_path / "test.puffin")) | ||
| writer = PuffinWriter(output_file) | ||
| with writer: | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [1]).to_blob()) | ||
|
|
||
| with pytest.raises(RuntimeError, match="Cannot add blob to closed Puffin writer"): | ||
| writer.add_blob(DeletionVector.from_positions("file.parquet", [2]).to_blob()) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name looks too generic. We could consider renaming it to DeletionVectorWriter or a similar name. I've opened #3491 to extract DV-specific logic from puffin.py.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @ebyhr's point
Once #3491 lands I think it would be worth rebasing this implementation on top of it. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. I'll wait for #3491 to land and then rebase this on top of it, renaming the writer to
DeletionVectorWriteras part of that. Thanks both!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up here: after the discussion in #3491 we decided to keep
PuffinWriteras a generic, format-level writer and put the DV serialization onDeletionVector(to_blob()), rather than renaming. I've reworked this PR onto #3491 accordingly — details in the summary comment above.