diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index aa051cd14..7ec518fca 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -188,6 +188,7 @@ set(ICEBERG_DATA_SOURCES data/data_writer.cc data/delete_filter.cc data/delete_loader.cc + data/deletion_vector_writer.cc data/equality_delete_writer.cc data/file_scan_task_reader.cc data/position_delete_writer.cc @@ -195,6 +196,7 @@ set(ICEBERG_DATA_SOURCES deletes/position_delete_index.cc deletes/position_delete_range_consumer.cc deletes/roaring_position_bitmap.cc + puffin/deletion_vector.cc puffin/file_metadata.cc puffin/json_serde.cc puffin/puffin_format.cc diff --git a/src/iceberg/data/delete_loader.cc b/src/iceberg/data/delete_loader.cc index 922173401..4b7ea8458 100644 --- a/src/iceberg/data/delete_loader.cc +++ b/src/iceberg/data/delete_loader.cc @@ -19,9 +19,12 @@ #include "iceberg/data/delete_loader.h" +#include #include +#include #include #include +#include #include #include @@ -30,9 +33,12 @@ #include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/deletes/position_delete_index.h" #include "iceberg/deletes/position_delete_range_consumer.h" +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/file_io.h" #include "iceberg/file_reader.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/metadata_columns.h" +#include "iceberg/puffin/deletion_vector.h" #include "iceberg/result.h" #include "iceberg/row/arrow_array_wrapper.h" #include "iceberg/schema.h" @@ -171,7 +177,44 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde } Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const { - return NotSupported("Loading deletion vectors is not yet supported"); + // A deletion vector must reference exactly one data file; without it the + // caller cannot know which data file the positions apply to. + ICEBERG_PRECHECK(file.referenced_data_file.has_value(), + "Deletion vector requires referenced_data_file: {}", file.file_path); + + // For deletion vectors, content_offset and content_size_in_bytes point directly + // at the DV blob bytes within the Puffin file and are required by the spec. + ICEBERG_PRECHECK( + file.content_offset.has_value() && file.content_size_in_bytes.has_value(), + "Deletion vector requires content_offset and content_size_in_bytes: {}", + file.file_path); + + const int64_t offset = file.content_offset.value(); + const int64_t length = file.content_size_in_bytes.value(); + ICEBERG_PRECHECK(offset >= 0 && length >= 0, + "Invalid deletion vector offset/length: offset={}, length={}", offset, + length); + ICEBERG_PRECHECK(length <= std::numeric_limits::max(), + "Cannot read deletion vector larger than 2GB: {}", length); + + ICEBERG_ASSIGN_OR_RAISE(auto input_file, io_->NewInputFile(file.file_path)); + ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open()); + + std::vector bytes(static_cast(length)); + ICEBERG_RETURN_UNEXPECTED(stream->ReadFully(offset, bytes)); + ICEBERG_RETURN_UNEXPECTED(stream->Close()); + + std::span blob(reinterpret_cast(bytes.data()), + bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto bitmap, puffin::DeserializeDeletionVectorBlob(blob)); + + // The bitmap cardinality must match the record count recorded in metadata. + ICEBERG_PRECHECK(std::cmp_equal(bitmap.Cardinality(), file.record_count), + "Deletion vector cardinality {} does not match record count {}: {}", + bitmap.Cardinality(), file.record_count, file.file_path); + + bitmap.ForEach([&index](int64_t pos) { index.Delete(pos); }); + return {}; } Result DeleteLoader::LoadPositionDeletes( diff --git a/src/iceberg/data/deletion_vector_writer.cc b/src/iceberg/data/deletion_vector_writer.cc new file mode 100644 index 000000000..f3200d559 --- /dev/null +++ b/src/iceberg/data/deletion_vector_writer.cc @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/deletion_vector_writer.h" + +#include +#include +#include +#include +#include +#include + +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/puffin/deletion_vector.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/util/macros.h" +#include "iceberg/version.h" + +namespace iceberg { + +class DeletionVectorWriter::Impl { + public: + explicit Impl(DeletionVectorWriterOptions options) : options_(std::move(options)) {} + + Status Delete(std::string_view referenced_data_file, int64_t pos) { + ICEBERG_CHECK(!closed_, "Cannot delete after the writer is closed"); + ICEBERG_PRECHECK(!referenced_data_file.empty(), + "Deletion vector requires a non-empty referenced data file"); + ICEBERG_PRECHECK(pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition, + "Invalid deletion vector position: {}", pos); + bitmaps_[std::string(referenced_data_file)].Add(pos); + return {}; + } + + Status Close() { + if (closed_) { + return {}; + } + + // No deletes: skip creating an orphan Puffin file that no metadata + // references. + if (bitmaps_.empty()) { + closed_ = true; + return {}; + } + + auto properties = options_.properties; + properties.try_emplace(std::string(puffin::StandardPuffinProperties::kCreatedBy), + std::format("iceberg-cpp/{}.{}.{}", ICEBERG_VERSION_MAJOR, + ICEBERG_VERSION_MINOR, ICEBERG_VERSION_PATCH)); + + ICEBERG_ASSIGN_OR_RAISE(auto output_file, options_.io->NewOutputFile(options_.path)); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + puffin::PuffinWriter::Make(std::move(output_file), std::move(properties))); + + // One blob per referenced data file, in deterministic (sorted) order. + struct Entry { + std::string referenced_data_file; + int64_t offset; + int64_t length; + int64_t cardinality; + }; + std::vector entries; + entries.reserve(bitmaps_.size()); + for (auto& [referenced_data_file, bitmap] : bitmaps_) { + bitmap.Optimize(); // run-length encode before serializing + ICEBERG_ASSIGN_OR_RAISE( + auto blob, puffin::MakeDeletionVectorBlob(bitmap, referenced_data_file)); + ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata, writer->Write(blob)); + entries.push_back(Entry{ + .referenced_data_file = referenced_data_file, + .offset = blob_metadata.offset, + .length = blob_metadata.length, + .cardinality = static_cast(bitmap.Cardinality()), + }); + } + + ICEBERG_RETURN_UNEXPECTED(writer->Finish()); + ICEBERG_ASSIGN_OR_RAISE(file_size_, writer->FileSize()); + + const auto spec_id = + options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt; + + for (auto& entry : entries) { + data_files_.push_back(std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = options_.path, + .file_format = FileFormatType::kPuffin, + .partition = options_.partition, + .record_count = entry.cardinality, + .file_size_in_bytes = file_size_, + .referenced_data_file = std::move(entry.referenced_data_file), + .content_offset = entry.offset, + .content_size_in_bytes = entry.length, + .partition_spec_id = spec_id, + })); + } + + closed_ = true; + return {}; + } + + Result Metadata() { + ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); + FileWriter::WriteResult result; + result.data_files = data_files_; + return result; + } + + private: + DeletionVectorWriterOptions options_; + // Ordered by referenced data file path for deterministic blob layout. + std::map bitmaps_; + std::vector> data_files_; + int64_t file_size_ = -1; + bool closed_ = false; +}; + +DeletionVectorWriter::DeletionVectorWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +DeletionVectorWriter::~DeletionVectorWriter() = default; + +Result> DeletionVectorWriter::Make( + DeletionVectorWriterOptions options) { + ICEBERG_PRECHECK(options.io != nullptr, "DeletionVectorWriter requires a FileIO"); + ICEBERG_PRECHECK(!options.path.empty(), "DeletionVectorWriter requires a path"); + return std::unique_ptr( + new DeletionVectorWriter(std::make_unique(std::move(options)))); +} + +Status DeletionVectorWriter::Delete(std::string_view referenced_data_file, int64_t pos) { + return impl_->Delete(referenced_data_file, pos); +} + +Status DeletionVectorWriter::Close() { return impl_->Close(); } + +Result DeletionVectorWriter::Metadata() { + return impl_->Metadata(); +} + +} // namespace iceberg diff --git a/src/iceberg/data/deletion_vector_writer.h b/src/iceberg/data/deletion_vector_writer.h new file mode 100644 index 000000000..9823ad5c1 --- /dev/null +++ b/src/iceberg/data/deletion_vector_writer.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/deletion_vector_writer.h +/// Writer that emits deletion vectors as `deletion-vector-v1` blobs in a Puffin file. + +#include +#include +#include +#include +#include + +#include "iceberg/data/writer.h" +#include "iceberg/iceberg_data_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating a DeletionVectorWriter. +struct ICEBERG_DATA_EXPORT DeletionVectorWriterOptions { + /// Output Puffin file location. + std::string path; + /// FileIO used to create the Puffin file. + std::shared_ptr io; + /// Partition spec the referenced data files belong to (optional). + std::shared_ptr spec; + /// Partition the referenced data files belong to. + PartitionValues partition; + /// File-level Puffin properties (e.g. "created-by"). + std::unordered_map properties; +}; + +/// \brief Writes one or more deletion vectors into a single Puffin file. +/// +/// Each referenced data file gets its own `deletion-vector-v1` blob. After +/// Close(), Metadata() returns one DataFile per blob, each carrying the +/// content_offset/content_size_in_bytes and referenced_data_file required to +/// register the deletion vector in a manifest. +/// +/// \note All referenced data files are assumed to belong to the single +/// partition supplied in the options. +class ICEBERG_DATA_EXPORT DeletionVectorWriter { + public: + ~DeletionVectorWriter(); + + DeletionVectorWriter(const DeletionVectorWriter&) = delete; + DeletionVectorWriter& operator=(const DeletionVectorWriter&) = delete; + + /// \brief Create a new DeletionVectorWriter. + static Result> Make( + DeletionVectorWriterOptions options); + + /// \brief Mark a row position as deleted for the given data file. + /// + /// Positions are accumulated per data file and serialized on Close(). + Status Delete(std::string_view referenced_data_file, int64_t pos); + + /// \brief Write all accumulated deletion vectors to the Puffin file and close it. + Status Close(); + + /// \brief Metadata for the DataFiles produced (one per referenced data file). + /// \note Must be called after Close(). + Result Metadata(); + + private: + class Impl; + std::unique_ptr impl_; + + explicit DeletionVectorWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build index bbb26db27..eaae8a2b4 100644 --- a/src/iceberg/data/meson.build +++ b/src/iceberg/data/meson.build @@ -20,6 +20,7 @@ install_headers( 'data_writer.h', 'delete_filter.h', 'delete_loader.h', + 'deletion_vector_writer.h', 'equality_delete_writer.h', 'file_scan_task_reader.h', 'position_delete_writer.h', diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 58d79a402..3dba09fec 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -170,6 +170,7 @@ iceberg_data_sources = files( 'data/data_writer.cc', 'data/delete_filter.cc', 'data/delete_loader.cc', + 'data/deletion_vector_writer.cc', 'data/equality_delete_writer.cc', 'data/file_scan_task_reader.cc', 'data/position_delete_writer.cc', @@ -177,6 +178,7 @@ iceberg_data_sources = files( 'deletes/position_delete_index.cc', 'deletes/position_delete_range_consumer.cc', 'deletes/roaring_position_bitmap.cc', + 'puffin/deletion_vector.cc', 'puffin/file_metadata.cc', 'puffin/json_serde.cc', 'puffin/puffin_format.cc', diff --git a/src/iceberg/puffin/deletion_vector.cc b/src/iceberg/puffin/deletion_vector.cc new file mode 100644 index 000000000..709ca87f1 --- /dev/null +++ b/src/iceberg/puffin/deletion_vector.cc @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/deletion_vector.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/metadata_columns.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Computes a CRC-32 checksum (zlib/IEEE polynomial) over the given bytes. +uint32_t ComputeCrc32(std::span bytes) { + uLong crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, reinterpret_cast(bytes.data()), + static_cast(bytes.size())); + return static_cast(crc); +} + +// Writes a 4-byte big-endian integer to the buffer. +template +void WriteBigEndian(T value, uint8_t* buf) { + T be = ToBigEndian(value); + std::memcpy(buf, &be, sizeof(be)); +} + +// Reads a 4-byte big-endian integer from the buffer. +template +T ReadBigEndian(const uint8_t* buf) { + T value; + std::memcpy(&value, buf, sizeof(value)); + return FromBigEndian(value); +} + +} // namespace + +Result> SerializeDeletionVectorBlob( + const RoaringPositionBitmap& bitmap) { + ICEBERG_ASSIGN_OR_RAISE(auto serialized, bitmap.Serialize()); + + // The length prefix and CRC both cover the magic sequence plus the vector. + const size_t magic_and_vector_size = + static_cast(DeletionVectorBlob::kMagicBytes) + serialized.size(); + ICEBERG_PRECHECK( + magic_and_vector_size <= static_cast(std::numeric_limits::max()), + "Deletion vector is too large to serialize: {} bytes", magic_and_vector_size); + + std::vector blob(static_cast(DeletionVectorBlob::kLengthPrefixBytes) + + magic_and_vector_size + + static_cast(DeletionVectorBlob::kCrcBytes)); + uint8_t* buf = blob.data(); + + WriteBigEndian(static_cast(magic_and_vector_size), buf); + buf += DeletionVectorBlob::kLengthPrefixBytes; + + uint8_t* checksum_begin = buf; + std::memcpy(buf, DeletionVectorBlob::kMagic.data(), DeletionVectorBlob::kMagicBytes); + buf += DeletionVectorBlob::kMagicBytes; + std::memcpy(buf, serialized.data(), serialized.size()); + buf += serialized.size(); + + WriteBigEndian( + ComputeCrc32(std::span(checksum_begin, magic_and_vector_size)), buf); + return blob; +} + +Result DeserializeDeletionVectorBlob( + std::span blob) { + constexpr size_t kMinSize = + static_cast(DeletionVectorBlob::kLengthPrefixBytes) + + static_cast(DeletionVectorBlob::kMagicBytes) + + static_cast(DeletionVectorBlob::kCrcBytes); + ICEBERG_PRECHECK(blob.size() >= kMinSize, + "Deletion vector blob too small: {} bytes, need at least {}", + blob.size(), kMinSize); + + const uint8_t* buf = blob.data(); + + const auto length = ReadBigEndian(buf); + buf += DeletionVectorBlob::kLengthPrefixBytes; + + ICEBERG_PRECHECK(length >= DeletionVectorBlob::kMagicBytes, + "Invalid deletion vector length prefix: {}", length); + + const size_t expected_total = + static_cast(DeletionVectorBlob::kLengthPrefixBytes) + + static_cast(length) + static_cast(DeletionVectorBlob::kCrcBytes); + ICEBERG_PRECHECK(blob.size() == expected_total, + "Deletion vector blob size mismatch: {} bytes, expected {}", + blob.size(), expected_total); + + // Magic and vector are checksummed together by the trailing CRC. + const uint8_t* checksum_begin = buf; + + for (size_t i = 0; i < DeletionVectorBlob::kMagic.size(); ++i) { + ICEBERG_PRECHECK(buf[i] == DeletionVectorBlob::kMagic[i], + "Invalid deletion vector magic byte at offset {}: got {:#04x}", i, + buf[i]); + } + buf += DeletionVectorBlob::kMagicBytes; + + const auto stored_crc = + ReadBigEndian(checksum_begin + static_cast(length)); + const uint32_t actual_crc = + ComputeCrc32(std::span(checksum_begin, static_cast(length))); + ICEBERG_PRECHECK(stored_crc == actual_crc, + "Deletion vector CRC mismatch: stored {:#010x}, computed {:#010x}", + stored_crc, actual_crc); + + const auto vector_size = static_cast(length) - DeletionVectorBlob::kMagicBytes; + std::string_view vector_bytes(reinterpret_cast(buf), vector_size); + return RoaringPositionBitmap::Deserialize(vector_bytes); +} + +Result MakeDeletionVectorBlob(const RoaringPositionBitmap& bitmap, + std::string referenced_data_file) { + ICEBERG_PRECHECK(!referenced_data_file.empty(), + "Deletion vector requires a non-empty referenced data file"); + + ICEBERG_ASSIGN_OR_RAISE(auto data, SerializeDeletionVectorBlob(bitmap)); + + Blob blob{ + .type = std::string(StandardBlobTypes::kDeletionVectorV1), + // The deletion vector is computed over row positions, matching the Java + // implementation which records the row-position metadata column field id. + .input_fields = {MetadataColumns::kFilePositionColumnId}, + // Snapshot ID and sequence number are not known when the Puffin file is + // created; the spec requires -1 for Puffin v1. + .snapshot_id = -1, + .sequence_number = -1, + .data = std::move(data), + .requested_compression = PuffinCompressionCodec::kNone, + }; + blob.properties.emplace( + std::string(StandardDeletionVectorProperties::kReferencedDataFile), + std::move(referenced_data_file)); + blob.properties.emplace(std::string(StandardDeletionVectorProperties::kCardinality), + std::format("{}", bitmap.Cardinality())); + + return blob; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/deletion_vector.h b/src/iceberg/puffin/deletion_vector.h new file mode 100644 index 000000000..edb6a541b --- /dev/null +++ b/src/iceberg/puffin/deletion_vector.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/deletion_vector.h +/// Serialization helpers for the `deletion-vector-v1` Puffin blob type. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/iceberg_data_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Required blob properties for the `deletion-vector-v1` blob type. +struct StandardDeletionVectorProperties { + /// Location of the data file the deletion vector applies to. + static constexpr std::string_view kReferencedDataFile = "referenced-data-file"; + /// Number of deleted rows (set positions) in the deletion vector. + static constexpr std::string_view kCardinality = "cardinality"; +}; + +/// \brief Constants describing the `deletion-vector-v1` blob framing. +/// +/// See the Puffin spec for the byte layout: +/// https://iceberg.apache.org/puffin-spec/#deletion-vector-v1-blob-type +struct ICEBERG_DATA_EXPORT DeletionVectorBlob { + /// Magic sequence preceding the serialized bitmap: 0xD1 0xD3 0x39 0x64. + static constexpr std::array kMagic = {0xD1, 0xD3, 0x39, 0x64}; + + /// Length of the big-endian length prefix, in bytes. + static constexpr int32_t kLengthPrefixBytes = 4; + /// Length of the magic sequence, in bytes. + static constexpr int32_t kMagicBytes = 4; + /// Length of the trailing big-endian CRC-32 checksum, in bytes. + static constexpr int32_t kCrcBytes = 4; +}; + +/// \brief Serialize a position bitmap into a `deletion-vector-v1` blob. +/// +/// The returned bytes include the length prefix, magic sequence, the Roaring +/// "portable" serialization of the bitmap, and the trailing CRC-32 checksum. +ICEBERG_DATA_EXPORT Result> SerializeDeletionVectorBlob( + const RoaringPositionBitmap& bitmap); + +/// \brief Deserialize a `deletion-vector-v1` blob into a position bitmap. +/// +/// Validates the length prefix, magic sequence, and CRC-32 checksum before +/// decoding the bitmap. +ICEBERG_DATA_EXPORT Result DeserializeDeletionVectorBlob( + std::span blob); + +/// \brief Build a Puffin `Blob` of type `deletion-vector-v1` from a position +/// bitmap. +/// +/// Sets `snapshot-id` and `sequence-number` to -1 (required for Puffin v1), +/// requests no compression, and populates the required `referenced-data-file` +/// and `cardinality` properties. +/// +/// \param bitmap The positions to delete. +/// \param referenced_data_file Location of the data file the vector applies to. +ICEBERG_DATA_EXPORT Result MakeDeletionVectorBlob( + const RoaringPositionBitmap& bitmap, std::string referenced_data_file); + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/meson.build b/src/iceberg/puffin/meson.build index 7f30468db..fbd75c6b1 100644 --- a/src/iceberg/puffin/meson.build +++ b/src/iceberg/puffin/meson.build @@ -17,6 +17,7 @@ install_headers( [ + 'deletion_vector.h', 'file_metadata.h', 'puffin_format.h', 'puffin_reader.h', diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0756c1eef..d3d3768a3 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -157,10 +157,14 @@ add_iceberg_test(util_test add_iceberg_test(puffin_test USE_DATA SOURCES + puffin_deletion_vector_test.cc puffin_format_test.cc puffin_json_test.cc puffin_reader_writer_test.cc) +add_iceberg_test(deletion_vector_writer_test USE_DATA SOURCES + deletion_vector_writer_test.cc) + if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test USE_BUNDLE diff --git a/src/iceberg/test/delete_filter_test.cc b/src/iceberg/test/delete_filter_test.cc index 89d1b6b85..a54aeec05 100644 --- a/src/iceberg/test/delete_filter_test.cc +++ b/src/iceberg/test/delete_filter_test.cc @@ -34,6 +34,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/data/deletion_vector_writer.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" @@ -249,6 +250,24 @@ class DeleteFilterTest : public ::testing::Test { static constexpr std::string_view kDataPath = "data.parquet"; + Result> DeletionVectorFile( + const std::string& path, const std::vector& positions, + const std::string& data_path = std::string(kDataPath)) { + DeletionVectorWriterOptions options{ + .path = path, + .io = file_io_, + .spec = partition_spec_, + .partition = PartitionValues{}, + }; + ICEBERG_ASSIGN_OR_RAISE(auto writer, DeletionVectorWriter::Make(std::move(options))); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED(writer->Delete(data_path, pos)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + std::shared_ptr file_io_; std::shared_ptr table_schema_; std::shared_ptr partition_spec_; @@ -1156,7 +1175,7 @@ TEST_F(DeleteFilterTest, DeletionVectorErrorPropagatesFromCompute) { ICEBERG_UNWRAP_OR_FAIL(auto batch, MakeBatch(*filter.value()->RequiredSchema(), R"([[1, 0]])")); auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); - ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(alive, IsError(ErrorKind::kInvalidArgument)); } TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) { @@ -1175,7 +1194,27 @@ TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) { auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); - ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(alive, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(DeleteFilterTest, DeletionVectorComputeAliveRows) { + // Write a real deletion vector with DeletionVectorWriter, then load it through + // DeleteFilter (DeleteLoader::LoadDV) and verify deleted positions are filtered. + ICEBERG_UNWRAP_OR_FAIL(auto dv, DeletionVectorFile("dv-alive.puffin", {1, 3})); + std::vector> delete_files = {dv}; + auto requested_schema = Project({1}); + auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_, + requested_schema, file_io_, + /*need_row_pos_col=*/true); + ASSERT_THAT(filter, IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto batch, + MakeBatch(*filter.value()->RequiredSchema(), + R"([[10, 0], [20, 1], [30, 2], [40, 3], [50, 4]])")); + auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array); + + ASSERT_THAT(alive, IsOk()); + ExpectAliveRows(alive.value(), {0, 2, 4}); } TEST_F(DeleteFilterTest, CounterAccumulatesAcrossBatches) { diff --git a/src/iceberg/test/delete_loader_test.cc b/src/iceberg/test/delete_loader_test.cc index b392065cf..294bc787a 100644 --- a/src/iceberg/test/delete_loader_test.cc +++ b/src/iceberg/test/delete_loader_test.cc @@ -29,10 +29,14 @@ #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/deletes/position_delete_index.h" +#include "iceberg/deletes/roaring_position_bitmap.h" #include "iceberg/file_format.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/parquet/parquet_register.h" #include "iceberg/partition_spec.h" +#include "iceberg/puffin/deletion_vector.h" +#include "iceberg/puffin/puffin_writer.h" #include "iceberg/row/partition_values.h" #include "iceberg/schema.h" #include "iceberg/schema_field.h" @@ -110,6 +114,32 @@ class DeleteLoaderTest : public ::testing::Test { return writer->Metadata().value().data_files[0]; } + /// Write a deletion vector to a Puffin file and return the DataFile metadata. + std::shared_ptr WriteDeletionVector(const std::string& path, + const std::string& referenced_data_file, + const std::vector& positions) { + RoaringPositionBitmap bitmap; + for (int64_t pos : positions) { + bitmap.Add(pos); + } + + auto output_file = file_io_->NewOutputFile(path).value(); + auto writer = puffin::PuffinWriter::Make(std::move(output_file)).value(); + auto blob = puffin::MakeDeletionVectorBlob(bitmap, referenced_data_file).value(); + auto blob_metadata = writer->Write(blob).value(); + ICEBERG_THROW_NOT_OK(writer->Finish()); + + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kPuffin, + .record_count = static_cast(bitmap.Cardinality()), + .referenced_data_file = referenced_data_file, + .content_offset = blob_metadata.offset, + .content_size_in_bytes = blob_metadata.length, + }); + } + std::shared_ptr file_io_; std::shared_ptr schema_; std::shared_ptr partition_spec_; @@ -239,15 +269,73 @@ TEST_F(DeleteLoaderTest, LoadPositionDeletesFastPathHonorsReferencedDataFile) { ASSERT_FALSE(index.IsDeleted(kRowCount)); } -TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsDV) { +TEST_F(DeleteLoaderTest, LoadDeletionVector) { + auto dv_file = + WriteDeletionVector("dv-a.puffin", "data.parquet", {0, 5, 10, 4'000'000'000LL}); + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + + auto& index = result.value(); + ASSERT_EQ(index.Cardinality(), 4); + ASSERT_TRUE(index.IsDeleted(0)); + ASSERT_TRUE(index.IsDeleted(5)); + ASSERT_TRUE(index.IsDeleted(10)); + ASSERT_TRUE(index.IsDeleted(4'000'000'000LL)); + ASSERT_FALSE(index.IsDeleted(1)); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorSkipsMismatchedReferencedDataFile) { + auto dv_file = WriteDeletionVector("dv-b.puffin", "other-data.parquet", {1, 2, 3}); + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + ASSERT_TRUE(result.value().IsEmpty()); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorRequiresContentOffsetAndSize) { auto dv_file = std::make_shared(DataFile{ .content = DataFile::Content::kPositionDeletes, .file_path = "dv.puffin", .file_format = FileFormatType::kPuffin, + .referenced_data_file = "data.parquet", }); std::vector> files = {dv_file}; auto result = loader_->LoadPositionDeletes(files, "data.parquet"); - ASSERT_THAT(result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +TEST_F(DeleteLoaderTest, LoadDeletionVectorRejectsCardinalityMismatch) { + auto dv_file = WriteDeletionVector("dv-card.puffin", "data.parquet", {0, 1, 2}); + // Corrupt the recorded cardinality so it no longer matches the bitmap. + dv_file->record_count = 99; + + std::vector> files = {dv_file}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument)); +} + +// Iceberg uses either a deletion vector or position deletes for a data file, not +// both. This exercises the loader's robustness: a mixed list is merged into one +// index, with each source filtered by the target data file path. +TEST_F(DeleteLoaderTest, LoadMixedDeletionVectorAndPositionDeletes) { + auto dv = WriteDeletionVector("dv-mixed.puffin", "data.parquet", {0, 5}); + auto pos = WritePositionDeletes("pos-mixed.parquet", + {{"data.parquet", 10}, {"data.parquet", 20}}); + + std::vector> files = {dv, pos}; + auto result = loader_->LoadPositionDeletes(files, "data.parquet"); + ASSERT_THAT(result, IsOk()); + + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 4); + EXPECT_TRUE(index.IsDeleted(0)); + EXPECT_TRUE(index.IsDeleted(5)); + EXPECT_TRUE(index.IsDeleted(10)); + EXPECT_TRUE(index.IsDeleted(20)); + EXPECT_FALSE(index.IsDeleted(1)); } TEST_F(DeleteLoaderTest, LoadPositionDeletesRejectsWrongContent) { diff --git a/src/iceberg/test/deletion_vector_writer_test.cc b/src/iceberg/test/deletion_vector_writer_test.cc new file mode 100644 index 000000000..b25418d2b --- /dev/null +++ b/src/iceberg/test/deletion_vector_writer_test.cc @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/deletion_vector_writer.h" + +#include +#include +#include + +#include + +#include "iceberg/data/delete_loader.h" +#include "iceberg/deletes/position_delete_index.h" +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_io.h" + +namespace iceberg { + +namespace { + +std::shared_ptr FindByReferencedFile( + const std::vector>& files, const std::string& ref) { + for (const auto& file : files) { + if (file->referenced_data_file == ref) { + return file; + } + } + return nullptr; +} + +} // namespace + +// Full write -> read round trip: write deletion vectors with the writer, then +// load them back through DeleteLoader using the produced DataFile metadata. +TEST(DeletionVectorWriterTest, WriteThenLoadEndToEnd) { + auto io = std::make_shared(); + + std::vector> data_files; + { + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://deletes.puffin", + .io = io, + .properties = {{"created-by", "iceberg-cpp-test"}}, + })); + + ASSERT_THAT(writer->Delete("data-a.parquet", 0), IsOk()); + ASSERT_THAT(writer->Delete("data-a.parquet", 5), IsOk()); + ASSERT_THAT(writer->Delete("data-a.parquet", 10), IsOk()); + ASSERT_THAT(writer->Delete("data-b.parquet", 1), IsOk()); + ASSERT_THAT(writer->Delete("data-b.parquet", 2), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + data_files = result.data_files; + } + + // One DataFile per referenced data file. + ASSERT_EQ(data_files.size(), 2u); + + auto dv_a = FindByReferencedFile(data_files, "data-a.parquet"); + auto dv_b = FindByReferencedFile(data_files, "data-b.parquet"); + ASSERT_NE(dv_a, nullptr); + ASSERT_NE(dv_b, nullptr); + + // Metadata is spec-compliant for a deletion vector. + EXPECT_EQ(dv_a->content, DataFile::Content::kPositionDeletes); + EXPECT_EQ(dv_a->file_format, FileFormatType::kPuffin); + EXPECT_TRUE(dv_a->IsDeletionVector()); + EXPECT_EQ(dv_a->file_path, "memory://deletes.puffin"); + EXPECT_EQ(dv_a->record_count, 3); + EXPECT_TRUE(dv_a->content_offset.has_value()); + EXPECT_TRUE(dv_a->content_size_in_bytes.has_value()); + EXPECT_GT(dv_a->file_size_in_bytes, 0); + EXPECT_EQ(dv_b->record_count, 2); + + // Both blobs live in the same Puffin file but at different offsets. + EXPECT_EQ(dv_a->file_path, dv_b->file_path); + EXPECT_NE(dv_a->content_offset.value(), dv_b->content_offset.value()); + + // Read back through DeleteLoader for data-a.parquet. + DeleteLoader loader(io); + { + auto result = loader.LoadPositionDeletes(data_files, "data-a.parquet"); + ASSERT_THAT(result, IsOk()); + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 3); + EXPECT_TRUE(index.IsDeleted(0)); + EXPECT_TRUE(index.IsDeleted(5)); + EXPECT_TRUE(index.IsDeleted(10)); + EXPECT_FALSE(index.IsDeleted(1)); + } + + // And for data-b.parquet (the loader filters by referenced_data_file). + { + auto result = loader.LoadPositionDeletes(data_files, "data-b.parquet"); + ASSERT_THAT(result, IsOk()); + auto& index = result.value(); + EXPECT_EQ(index.Cardinality(), 2); + EXPECT_TRUE(index.IsDeleted(1)); + EXPECT_TRUE(index.IsDeleted(2)); + EXPECT_FALSE(index.IsDeleted(0)); + } +} + +TEST(DeletionVectorWriterTest, EmptyWriterProducesNoDataFiles) { + auto io = std::make_shared(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://empty.puffin", .io = io})); + ASSERT_THAT(writer->Close(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto result, writer->Metadata()); + EXPECT_TRUE(result.data_files.empty()); + + // No blobs were written, so no (orphan) Puffin file should have been created. + EXPECT_THAT(io->NewInputFile("memory://empty.puffin"), IsError(ErrorKind::kNotFound)); +} + +TEST(DeletionVectorWriterTest, DeleteRejectsInvalidPosition) { + auto io = std::make_shared(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://invalid.puffin", .io = io})); + EXPECT_THAT(writer->Delete("data-a.parquet", -1), IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer->Delete("data-a.parquet", RoaringPositionBitmap::kMaxPosition + 1), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, MakeRejectsNullIo) { + EXPECT_THAT(DeletionVectorWriter::Make(DeletionVectorWriterOptions{.path = "x.puffin"}), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, MakeRejectsEmptyPath) { + auto io = std::make_shared(); + EXPECT_THAT(DeletionVectorWriter::Make(DeletionVectorWriterOptions{.io = io}), + IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorWriterTest, DeleteAfterCloseFails) { + auto io = std::make_shared(); + ICEBERG_UNWRAP_OR_FAIL(auto writer, + DeletionVectorWriter::Make(DeletionVectorWriterOptions{ + .path = "memory://closed.puffin", .io = io})); + ASSERT_THAT(writer->Close(), IsOk()); + EXPECT_THAT(writer->Delete("data-a.parquet", 0), IsError(ErrorKind::kValidationFailed)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/file_scan_task_reader_test.cc b/src/iceberg/test/file_scan_task_reader_test.cc index 1630a108a..1ad577c8f 100644 --- a/src/iceberg/test/file_scan_task_reader_test.cc +++ b/src/iceberg/test/file_scan_task_reader_test.cc @@ -38,6 +38,7 @@ #include "iceberg/arrow/arrow_register.h" #include "iceberg/arrow_c_data_guard_internal.h" #include "iceberg/arrow_c_data_util_internal.h" +#include "iceberg/data/deletion_vector_writer.h" #include "iceberg/data/equality_delete_writer.h" #include "iceberg/data/position_delete_writer.h" #include "iceberg/file_format.h" @@ -298,6 +299,24 @@ class FileScanTaskReaderTest : public TempFileTestBase { return metadata.data_files[0]; } + Result> MakeDeletionVectorFile( + const std::string& path, const std::vector& positions, + const std::string& data_path) { + DeletionVectorWriterOptions options{ + .path = path, + .io = file_io_, + .spec = partition_spec_, + .partition = PartitionValues{}, + }; + ICEBERG_ASSIGN_OR_RAISE(auto writer, DeletionVectorWriter::Make(std::move(options))); + for (int64_t pos : positions) { + ICEBERG_RETURN_UNEXPECTED(writer->Delete(data_path, pos)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata()); + return metadata.data_files[0]; + } + void VerifyStream(struct ArrowArrayStream* stream, std::string_view expected_json) { auto record_batch_reader = ::arrow::ImportRecordBatchReader(stream).ValueOrDie(); @@ -401,6 +420,30 @@ TEST_F(FileScanTaskReaderTest, OpenWithPositionDeletesFiltersRowsAndPrunesPos) { ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); } +TEST_F(FileScanTaskReaderTest, OpenWithDeletionVectorFiltersRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto data_file, + MakeDataFile(table_schema_, + R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", "green"]])")); + ICEBERG_UNWRAP_OR_FAIL( + auto dv, MakeDeletionVectorFile(CreateNewTempFilePathWithSuffix(".puffin"), {1}, + data_file->file_path)); + FileScanTask task(data_file, {dv}); + + FileScanTaskReader::Options options{ + .io = file_io_, + .table_schema = table_schema_, + .schemas = {table_schema_}, + .projected_schema = projected_schema_, + }; + ICEBERG_UNWRAP_OR_FAIL(auto reader, FileScanTaskReader::Make(std::move(options))); + auto stream_result = reader->Open(task); + ASSERT_THAT(stream_result, IsOk()); + auto stream = std::move(stream_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, "Baz"]])")); +} + TEST_F(FileScanTaskReaderTest, OpenWithEqualityDeletesAddsAndPrunesDeleteOnlyColumns) { ICEBERG_UNWRAP_OR_FAIL( auto data_file, diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index b01a61904..8c7565053 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -117,12 +117,17 @@ iceberg_tests = { }, 'puffin_test': { 'sources': files( + 'puffin_deletion_vector_test.cc', 'puffin_format_test.cc', 'puffin_json_test.cc', 'puffin_reader_writer_test.cc', ), 'use_data': true, }, + 'deletion_vector_writer_test': { + 'sources': files('deletion_vector_writer_test.cc'), + 'use_data': true, + }, } if get_option('rest').enabled() diff --git a/src/iceberg/test/puffin_deletion_vector_test.cc b/src/iceberg/test/puffin_deletion_vector_test.cc new file mode 100644 index 000000000..57c17fd02 --- /dev/null +++ b/src/iceberg/test/puffin_deletion_vector_test.cc @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include + +#include "iceberg/deletes/roaring_position_bitmap.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/puffin/deletion_vector.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_io.h" + +namespace iceberg::puffin { + +namespace { + +RoaringPositionBitmap MakeBitmap(const std::vector& positions) { + RoaringPositionBitmap bitmap; + for (int64_t pos : positions) { + bitmap.Add(pos); + } + return bitmap; +} + +void ExpectContainsExactly(const RoaringPositionBitmap& bitmap, + const std::vector& positions) { + EXPECT_EQ(bitmap.Cardinality(), positions.size()); + for (int64_t pos : positions) { + EXPECT_TRUE(bitmap.Contains(pos)) << "missing position " << pos; + } +} + +} // namespace + +// ==================== Blob framing (length + magic + vector + CRC) ==================== + +TEST(DeletionVectorBlobTest, RoundTrip) { + const std::vector positions = {0, 1, 5, 100, 4'000'000'000LL}; + auto bitmap = MakeBitmap(positions); + + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + ICEBERG_UNWRAP_OR_FAIL(auto restored, DeserializeDeletionVectorBlob(blob)); + + ExpectContainsExactly(restored, positions); +} + +// Mirrors Java's TestBitmapPositionDeleteIndex#testAllContainerTypesIndexSerialization: +// spans two high-32-bit keys and exercises all Roaring container types (sparse +// "array", dense "bitset", and run containers after Optimize). +TEST(DeletionVectorBlobTest, RoundTripAllContainerTypesAcrossKeys) { + constexpr int64_t kKeyStride = 0x100000000LL; // 2^32: high-32-bit key + constexpr int64_t kContainerStride = 1 << 16; // 2^16: Roaring container + auto pos = [](int64_t key, int64_t container, int64_t value) { + return key * kKeyStride + container * kContainerStride + value; + }; + + RoaringPositionBitmap bitmap; + int64_t expected = 0; + auto add = [&](int64_t p) { + bitmap.Add(p); + ++expected; + }; + auto add_range = [&](int64_t begin, int64_t end) { + bitmap.AddRange(begin, end); + expected += end - begin; + }; + + for (int64_t key : {int64_t{0}, int64_t{1}}) { + add(pos(key, 0, 5)); // sparse -> array + add(pos(key, 0, 7)); + add_range(pos(key, 1, 1), pos(key, 1, 1000)); // medium run + add_range(pos(key, 2, 1), pos(key, 2, kContainerStride)); // dense -> bitset + } + + bitmap.Optimize(); // run-length encode, as the DV writer does + + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + ICEBERG_UNWRAP_OR_FAIL(auto restored, DeserializeDeletionVectorBlob(blob)); + + EXPECT_EQ(restored.Cardinality(), static_cast(expected)); + EXPECT_TRUE(restored.Contains(pos(0, 0, 5))); + EXPECT_TRUE(restored.Contains(pos(1, 2, kContainerStride - 1))); + EXPECT_TRUE(restored.Contains(pos(0, 1, 999))); + EXPECT_FALSE(restored.Contains(pos(0, 0, 6))); + EXPECT_FALSE(restored.Contains(pos(1, 1, 1000))); // range end is exclusive +} + +TEST(DeletionVectorBlobTest, RoundTripEmpty) { + auto bitmap = MakeBitmap({}); + + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + ICEBERG_UNWRAP_OR_FAIL(auto restored, DeserializeDeletionVectorBlob(blob)); + + EXPECT_TRUE(restored.IsEmpty()); +} + +TEST(DeletionVectorBlobTest, BlobLayout) { + auto bitmap = MakeBitmap({1, 2, 3}); + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + + // length(4) + magic(4) + vector + crc(4) + ASSERT_GE(blob.size(), 12u); + + // Magic sequence: 0xD1 0xD3 0x39 0x64 immediately follows the length prefix. + EXPECT_EQ(blob[4], 0xD1); + EXPECT_EQ(blob[5], 0xD3); + EXPECT_EQ(blob[6], 0x39); + EXPECT_EQ(blob[7], 0x64); + + // Length prefix (big-endian) equals magic + vector size. + const uint32_t length = + (static_cast(blob[0]) << 24) | (static_cast(blob[1]) << 16) | + (static_cast(blob[2]) << 8) | static_cast(blob[3]); + EXPECT_EQ(length, blob.size() - 8u); +} + +TEST(DeletionVectorBlobTest, RejectsCorruptedCrc) { + auto bitmap = MakeBitmap({1, 2, 3}); + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + + blob.back() ^= 0xFF; // flip a bit in the trailing CRC + EXPECT_THAT(DeserializeDeletionVectorBlob(blob), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorBlobTest, RejectsBadMagic) { + auto bitmap = MakeBitmap({1, 2, 3}); + ICEBERG_UNWRAP_OR_FAIL(auto blob, SerializeDeletionVectorBlob(bitmap)); + + blob[4] = 0x00; + EXPECT_THAT(DeserializeDeletionVectorBlob(blob), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorBlobTest, RejectsTruncatedBlob) { + std::vector blob = {0x00, 0x00}; + EXPECT_THAT(DeserializeDeletionVectorBlob(blob), IsError(ErrorKind::kInvalidArgument)); +} + +TEST(DeletionVectorBlobTest, MakeBlobSetsSpecFields) { + ICEBERG_UNWRAP_OR_FAIL( + auto blob, MakeDeletionVectorBlob(MakeBitmap({1, 2, 3}), "/data/file-a.parquet")); + + EXPECT_EQ(blob.type, StandardBlobTypes::kDeletionVectorV1); + EXPECT_EQ(blob.input_fields, + std::vector{MetadataColumns::kFilePositionColumnId}); + EXPECT_EQ(blob.snapshot_id, -1); + EXPECT_EQ(blob.sequence_number, -1); + EXPECT_EQ(blob.requested_compression, PuffinCompressionCodec::kNone); + EXPECT_EQ(blob.properties.at( + std::string(StandardDeletionVectorProperties::kReferencedDataFile)), + "/data/file-a.parquet"); + EXPECT_EQ( + blob.properties.at(std::string(StandardDeletionVectorProperties::kCardinality)), + "3"); +} + +TEST(DeletionVectorBlobTest, MakeBlobRejectsEmptyReferencedFile) { + EXPECT_THAT(MakeDeletionVectorBlob(MakeBitmap({1}), ""), + IsError(ErrorKind::kInvalidArgument)); +} + +// ==================== End-to-end through a Puffin file ==================== + +TEST(DeletionVectorBlobTest, EndToEndThroughPuffinFile) { + const std::vector positions = {0, 3, 7, 1'000, 5'000'000'000LL}; + MockFileIO io; + const std::string location = "memory://dv.puffin"; + + { + ICEBERG_UNWRAP_OR_FAIL(auto output, io.NewOutputFile(location)); + ICEBERG_UNWRAP_OR_FAIL(auto writer, PuffinWriter::Make(std::move(output))); + ICEBERG_UNWRAP_OR_FAIL( + auto blob, MakeDeletionVectorBlob(MakeBitmap(positions), "/data/file-a.parquet")); + ICEBERG_UNWRAP_OR_FAIL(auto meta, writer->Write(blob)); + EXPECT_EQ(meta.type, StandardBlobTypes::kDeletionVectorV1); + EXPECT_TRUE(meta.compression_codec.empty()); + ASSERT_THAT(writer->Finish(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto input, io.NewInputFile(location)); + ICEBERG_UNWRAP_OR_FAIL(auto reader, PuffinReader::Make(std::move(input))); + ICEBERG_UNWRAP_OR_FAIL(auto file_metadata, reader->ReadFileMetadata()); + ASSERT_EQ(file_metadata.blobs.size(), 1u); + EXPECT_EQ(file_metadata.blobs[0].properties.at( + std::string(StandardDeletionVectorProperties::kReferencedDataFile)), + "/data/file-a.parquet"); + + ICEBERG_UNWRAP_OR_FAIL(auto blob_result, reader->ReadBlob(file_metadata.blobs[0])); + const auto& bytes = blob_result.second; + std::span blob_bytes(reinterpret_cast(bytes.data()), + bytes.size()); + ICEBERG_UNWRAP_OR_FAIL(auto restored, DeserializeDeletionVectorBlob(blob_bytes)); + + ExpectContainsExactly(restored, positions); +} + +} // namespace iceberg::puffin