diff --git a/pyiceberg/table/deletion_vector.py b/pyiceberg/table/deletion_vector.py index f337c758a7..73945b9616 100644 --- a/pyiceberg/table/deletion_vector.py +++ b/pyiceberg/table/deletion_vector.py @@ -14,12 +14,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import math +import zlib +from collections.abc import Iterable from typing import TYPE_CHECKING from pyroaring import BitMap, FrozenBitMap -from pyiceberg.table.puffin import PuffinFile +from pyiceberg.table.puffin import PuffinBlob, PuffinBlobMetadata, PuffinFile if TYPE_CHECKING: import pyarrow as pa @@ -27,6 +30,10 @@ EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" +# Reserved field id of the row position (_pos) metadata column, referenced by +# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION) +ROW_POSITION_FIELD_ID = 2147483645 class DeletionVector: @@ -37,6 +44,21 @@ def __init__(self, referenced_data_file: str, bitmaps: list[BitMap]) -> None: self.referenced_data_file = referenced_data_file self._bitmaps = bitmaps + @classmethod + def from_positions(cls, referenced_data_file: str, positions: Iterable[int]) -> "DeletionVector": + bitmaps_by_key: dict[int, BitMap] = {} + for position in positions: + if position < 0: + raise ValueError(f"Invalid position: {position}, positions must be non-negative") + bitmaps_by_key.setdefault(position >> 32, BitMap()).add(position & 0xFFFFFFFF) + + if not bitmaps_by_key: + raise ValueError("Deletion vector must contain at least one position") + + # Materialize a list indexed by key, padding gaps with the empty bitmap (mirrors _deserialize_bitmap) + bitmaps: list[BitMap] = [bitmaps_by_key.get(key, EMPTY_BITMAP) for key in range(max(bitmaps_by_key) + 1)] + return cls(referenced_data_file, bitmaps) + @staticmethod def _deserialize_bitmap(pl: bytes) -> list[BitMap]: number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little") @@ -67,6 +89,21 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps + @staticmethod + def _serialize_bitmap(bitmaps: list[BitMap]) -> bytes: + # Counterpart of _deserialize_bitmap: number of bitmaps (8 bytes, little-endian), then for each + # non-empty bitmap in ascending key order its key (4 bytes, little-endian) and serialized payload. + non_empty = [(key, bitmap) for key, bitmap in enumerate(bitmaps) if len(bitmap) > 0] + + with io.BytesIO() as out: + out.write(len(non_empty).to_bytes(8, "little")) + for key, bitmap in non_empty: + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + out.write(key.to_bytes(4, "little")) + out.write(bitmap.serialize()) + return out.getvalue() + @staticmethod def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": import pyarrow as pa @@ -76,6 +113,29 @@ def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": def to_vector(self) -> "pa.ChunkedArray": return self._bitmaps_to_chunked_array(self._bitmaps) + def to_blob(self) -> PuffinBlob: + vector_payload = self._serialize_bitmap(self._bitmaps) + + # deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian), + # the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian) + blob_content = DELETION_VECTOR_MAGIC + vector_payload + payload = len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big") + + cardinality = sum(len(bitmap) for bitmap in self._bitmaps) + metadata = PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[ROW_POSITION_FIELD_ID], + # -1 means the snapshot id and sequence number are inherited at commit time + snapshot_id=-1, + sequence_number=-1, + # offset and length are placeholders; PuffinWriter fills them in when assembling the file + offset=0, + length=0, + properties={PROPERTY_REFERENCED_DATA_FILE: self.referenced_data_file, "cardinality": str(cardinality)}, + compression_codec=None, + ) + return PuffinBlob(metadata=metadata, payload=payload) + def deletion_vectors_from_puffin_file(puffin_file: PuffinFile) -> list[DeletionVector]: return [ diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 571687bb3f..56018dccf8 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,10 +14,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io +from dataclasses import dataclass +from types import TracebackType from typing import TYPE_CHECKING, Literal from pydantic import Field +from pyiceberg import __version__ +from pyiceberg.io import OutputFile from pyiceberg.typedef import IcebergBaseModel from pyiceberg.utils.deprecated import deprecated @@ -75,3 +80,73 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: from pyiceberg.table.deletion_vector import deletion_vectors_from_puffin_file # local import avoids the cycle return {dv.referenced_data_file: dv.to_vector() for dv in deletion_vectors_from_puffin_file(self)} + + +@dataclass(frozen=True) +class PuffinBlob: + """A blob to write into a Puffin file: its metadata and serialized payload.""" + + metadata: PuffinBlobMetadata + payload: bytes + + +class PuffinWriter: + """Assembles a Puffin file from blobs and writes it to an output file. + + This writer is format-level and blob-agnostic: callers supply already-serialized blobs + (for example via DeletionVector.to_blob()). Use it as a context manager; the file is + written on exit, after which its size is available via len(output_file). + """ + + closed: bool + _output_file: OutputFile + _blobs: list[PuffinBlob] + _created_by: str + + def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None: + self.closed = False + self._output_file = output_file + self._blobs = [] + self._created_by = created_by if created_by is not None else f"PyIceberg version {__version__}" + + def __enter__(self) -> "PuffinWriter": + """Open the writer.""" + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + """Assemble the Puffin file and write it to the output file.""" + self.closed = True + + with io.BytesIO() as out: + out.write(MAGIC_BYTES) + + blobs_metadata: list[PuffinBlobMetadata] = [] + for blob in self._blobs: + # offset and length are placeholders on the blob's metadata until the file is assembled here + blobs_metadata.append(blob.metadata.model_copy(update={"offset": out.tell(), "length": len(blob.payload)})) + out.write(blob.payload) + + footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by}) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + out.write(MAGIC_BYTES) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + puffin_bytes = out.getvalue() + + with self._output_file.create(overwrite=True) as output_stream: + output_stream.write(puffin_bytes) + + def add_blob(self, blob: PuffinBlob) -> "PuffinWriter": + if self.closed: + raise RuntimeError("Cannot add blob to closed Puffin writer") + self._blobs.append(blob) + return self diff --git a/tests/table/test_deletion_vector.py b/tests/table/test_deletion_vector.py index 788216f8b3..9dc55c417f 100644 --- a/tests/table/test_deletion_vector.py +++ b/tests/table/test_deletion_vector.py @@ -19,7 +19,12 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.deletion_vector import DeletionVector +from pyiceberg.table.deletion_vector import ( + DELETION_VECTOR_MAGIC, + PROPERTY_REFERENCED_DATA_FILE, + ROW_POSITION_FIELD_ID, + DeletionVector, +) def _open_file(file: str) -> bytes: @@ -71,3 +76,56 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = DeletionVector._deserialize_bitmap(puffin) + + +@pytest.mark.parametrize( + "positions", + [ + [1, 2, 3], + [0], + [3, 1, 2, 1, 3], # unordered with duplicates + [0, 1, 5, (1 << 32) + 7, (2 << 32) + 4], # spread across multiple bitmap keys + ], +) +def test_serialize_bitmap_round_trips(positions: list[int]) -> None: + dv = DeletionVector.from_positions("file.parquet", positions) + + serialized = DeletionVector._serialize_bitmap(dv._bitmaps) + assert DeletionVector._serialize_bitmap(DeletionVector._deserialize_bitmap(serialized)) == serialized + assert dv.to_vector().to_pylist() == sorted(set(positions)) + + +def test_from_positions_rejects_negative() -> None: + with pytest.raises(ValueError, match="Invalid position: -1, positions must be non-negative"): + DeletionVector.from_positions("file.parquet", [1, -1, 2]) + + +def test_from_positions_rejects_empty() -> None: + with pytest.raises(ValueError, match="Deletion vector must contain at least one position"): + DeletionVector.from_positions("file.parquet", []) + + +def test_to_blob_metadata() -> None: + blob = DeletionVector.from_positions("s3://bucket/file.parquet", [1, 2, 3, 3]).to_blob() + + assert blob.metadata.type == "deletion-vector-v1" + assert blob.metadata.fields == [ROW_POSITION_FIELD_ID] + assert blob.metadata.properties[PROPERTY_REFERENCED_DATA_FILE] == "s3://bucket/file.parquet" + # duplicates collapse, so cardinality counts distinct positions + assert blob.metadata.properties["cardinality"] == "3" + # offset and length are placeholders until PuffinWriter assembles the file + assert blob.metadata.offset == 0 + assert blob.metadata.length == 0 + + +def test_to_blob_payload_layout() -> None: + blob = DeletionVector.from_positions("file.parquet", [1, 2, 3]).to_blob() + + # Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian), + # where the length and CRC-32 both cover the magic bytes plus the vector. + length_prefix = int.from_bytes(blob.payload[0:4], "big") + assert blob.payload[4:8] == DELETION_VECTOR_MAGIC + vector = blob.payload[8 : 4 + length_prefix] + assert length_prefix == len(DELETION_VECTOR_MAGIC) + len(vector) + assert len(blob.payload) == 4 + length_prefix + 4 + assert DeletionVector._deserialize_bitmap(vector) == DeletionVector.from_positions("file.parquet", [1, 2, 3])._bitmaps diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py new file mode 100644 index 0000000000..353cc27bd7 --- /dev/null +++ b/tests/table/test_puffin.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path + +import pytest + +from pyiceberg import __version__ +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.table.deletion_vector import DeletionVector, deletion_vectors_from_puffin_file +from pyiceberg.table.puffin import MAGIC_BYTES, PuffinFile, PuffinWriter + + +def _write(tmp_path: Path, *deletion_vectors: DeletionVector, created_by: str | None = None) -> Path: + puffin_path = tmp_path / "test.puffin" + with PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by) as writer: + for dv in deletion_vectors: + writer.add_blob(dv.to_blob()) + return puffin_path + + +def test_puffin_writer_round_trips_single_blob(tmp_path: Path) -> None: + positions = [0, 1, 5, (1 << 32) + 7] + puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", positions)) + + reader = PuffinFile(puffin_path.read_bytes()) + dvs = deletion_vectors_from_puffin_file(reader) + + assert len(dvs) == 1 + assert dvs[0].referenced_data_file == "file.parquet" + assert dvs[0].to_vector().to_pylist() == sorted(positions) + + +def test_puffin_writer_round_trips_multiple_blobs(tmp_path: Path) -> None: + puffin_path = _write( + tmp_path, + DeletionVector.from_positions("file1.parquet", [1, 2, 3]), + DeletionVector.from_positions("file2.parquet", [4, 5, 6]), + ) + + reader = PuffinFile(puffin_path.read_bytes()) + dvs = deletion_vectors_from_puffin_file(reader) + + assert {dv.referenced_data_file: dv.to_vector().to_pylist() for dv in dvs} == { + "file1.parquet": [1, 2, 3], + "file2.parquet": [4, 5, 6], + } + + +def test_puffin_writer_writes_magic_bytes_and_offsets(tmp_path: Path) -> None: + puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1, 2, 3])) + puffin_bytes = puffin_path.read_bytes() + + assert puffin_bytes[:4] == MAGIC_BYTES + assert puffin_bytes[-4:] == MAGIC_BYTES + + blob = PuffinFile(puffin_bytes).footer.blobs[0] + # PuffinWriter fills in the placeholder offset and length while assembling the file + assert blob.offset > 0 + assert blob.length > 0 + + +def test_puffin_writer_default_created_by(tmp_path: Path) -> None: + puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1])) + + reader = PuffinFile(puffin_path.read_bytes()) + assert reader.footer.properties["created-by"] == f"PyIceberg version {__version__}" + + +def test_puffin_writer_custom_created_by(tmp_path: Path) -> None: + puffin_path = _write(tmp_path, DeletionVector.from_positions("file.parquet", [1]), created_by="my-test-app") + + reader = PuffinFile(puffin_path.read_bytes()) + assert reader.footer.properties["created-by"] == "my-test-app" + + +def test_puffin_writer_file_size_via_output_file(tmp_path: Path) -> None: + puffin_path = tmp_path / "test.puffin" + output_file = PyArrowFileIO().new_output(str(puffin_path)) + with PuffinWriter(output_file) as writer: + writer.add_blob(DeletionVector.from_positions("file.parquet", [1, 2, 3]).to_blob()) + + assert len(output_file) == len(puffin_path.read_bytes()) + + +def test_puffin_writer_empty(tmp_path: Path) -> None: + puffin_path = _write(tmp_path) + + reader = PuffinFile(puffin_path.read_bytes()) + assert reader.footer.blobs == [] + assert deletion_vectors_from_puffin_file(reader) == [] + + +def test_add_blob_to_closed_writer_raises(tmp_path: Path) -> None: + output_file = PyArrowFileIO().new_output(str(tmp_path / "test.puffin")) + writer = PuffinWriter(output_file) + with writer: + writer.add_blob(DeletionVector.from_positions("file.parquet", [1]).to_blob()) + + with pytest.raises(RuntimeError, match="Cannot add blob to closed Puffin writer"): + writer.add_blob(DeletionVector.from_positions("file.parquet", [2]).to_blob())