-
Notifications
You must be signed in to change notification settings - Fork 113
feat(puffin): support deletion-vector-v1 blob read/write #777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/data/deletion_vector_writer.h" | ||
|
|
||
| #include <format> | ||
| #include <map> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <utility> | ||
| #include <vector> | ||
|
|
||
| #include "iceberg/deletes/roaring_position_bitmap.h" | ||
| #include "iceberg/file_io.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/partition_spec.h" | ||
| #include "iceberg/puffin/deletion_vector.h" | ||
| #include "iceberg/puffin/file_metadata.h" | ||
| #include "iceberg/puffin/puffin_writer.h" | ||
| #include "iceberg/util/macros.h" | ||
| #include "iceberg/version.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| class DeletionVectorWriter::Impl { | ||
| public: | ||
| explicit Impl(DeletionVectorWriterOptions options) : options_(std::move(options)) {} | ||
|
|
||
| Status Delete(std::string_view referenced_data_file, int64_t pos) { | ||
| ICEBERG_CHECK(!closed_, "Cannot delete after the writer is closed"); | ||
| ICEBERG_PRECHECK(!referenced_data_file.empty(), | ||
| "Deletion vector requires a non-empty referenced data file"); | ||
| ICEBERG_PRECHECK(pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition, | ||
| "Invalid deletion vector position: {}", pos); | ||
| bitmaps_[std::string(referenced_data_file)].Add(pos); | ||
| return {}; | ||
| } | ||
|
|
||
| Status Close() { | ||
| if (closed_) { | ||
| return {}; | ||
| } | ||
|
|
||
| // No deletes: skip creating an orphan Puffin file that no metadata | ||
| // references. | ||
| if (bitmaps_.empty()) { | ||
| closed_ = true; | ||
| return {}; | ||
| } | ||
|
|
||
| auto properties = options_.properties; | ||
| properties.try_emplace(std::string(puffin::StandardPuffinProperties::kCreatedBy), | ||
| std::format("iceberg-cpp/{}.{}.{}", ICEBERG_VERSION_MAJOR, | ||
| ICEBERG_VERSION_MINOR, ICEBERG_VERSION_PATCH)); | ||
|
|
||
| ICEBERG_ASSIGN_OR_RAISE(auto output_file, options_.io->NewOutputFile(options_.path)); | ||
| ICEBERG_ASSIGN_OR_RAISE( | ||
| auto writer, | ||
| puffin::PuffinWriter::Make(std::move(output_file), std::move(properties))); | ||
|
|
||
| // One blob per referenced data file, in deterministic (sorted) order. | ||
| struct Entry { | ||
| std::string referenced_data_file; | ||
| int64_t offset; | ||
| int64_t length; | ||
| int64_t cardinality; | ||
| }; | ||
| std::vector<Entry> entries; | ||
| entries.reserve(bitmaps_.size()); | ||
| for (auto& [referenced_data_file, bitmap] : bitmaps_) { | ||
| bitmap.Optimize(); // run-length encode before serializing | ||
| ICEBERG_ASSIGN_OR_RAISE( | ||
| auto blob, puffin::MakeDeletionVectorBlob(bitmap, referenced_data_file)); | ||
| ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata, writer->Write(blob)); | ||
| entries.push_back(Entry{ | ||
| .referenced_data_file = referenced_data_file, | ||
| .offset = blob_metadata.offset, | ||
| .length = blob_metadata.length, | ||
| .cardinality = static_cast<int64_t>(bitmap.Cardinality()), | ||
| }); | ||
| } | ||
|
|
||
| ICEBERG_RETURN_UNEXPECTED(writer->Finish()); | ||
| ICEBERG_ASSIGN_OR_RAISE(file_size_, writer->FileSize()); | ||
|
|
||
| const auto spec_id = | ||
| options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt; | ||
|
|
||
| for (auto& entry : entries) { | ||
| data_files_.push_back(std::make_shared<DataFile>(DataFile{ | ||
| .content = DataFile::Content::kPositionDeletes, | ||
| .file_path = options_.path, | ||
| .file_format = FileFormatType::kPuffin, | ||
| .partition = options_.partition, | ||
| .record_count = entry.cardinality, | ||
| .file_size_in_bytes = file_size_, | ||
| .referenced_data_file = std::move(entry.referenced_data_file), | ||
| .content_offset = entry.offset, | ||
| .content_size_in_bytes = entry.length, | ||
| .partition_spec_id = spec_id, | ||
| })); | ||
| } | ||
|
|
||
| closed_ = true; | ||
| return {}; | ||
| } | ||
|
|
||
| Result<FileWriter::WriteResult> Metadata() { | ||
| ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer"); | ||
| FileWriter::WriteResult result; | ||
| result.data_files = data_files_; | ||
| return result; | ||
| } | ||
|
|
||
| private: | ||
| DeletionVectorWriterOptions options_; | ||
| // Ordered by referenced data file path for deterministic blob layout. | ||
| std::map<std::string, RoaringPositionBitmap> bitmaps_; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please model the private state like Java Deletes: path, positions, spec, and partition. A map from path to bitmap loses per-file metadata, so the wrong manifest metadata can be produced. |
||
| std::vector<std::shared_ptr<DataFile>> data_files_; | ||
| int64_t file_size_ = -1; | ||
| bool closed_ = false; | ||
| }; | ||
|
|
||
| DeletionVectorWriter::DeletionVectorWriter(std::unique_ptr<Impl> impl) | ||
| : impl_(std::move(impl)) {} | ||
|
|
||
| DeletionVectorWriter::~DeletionVectorWriter() = default; | ||
|
|
||
| Result<std::unique_ptr<DeletionVectorWriter>> DeletionVectorWriter::Make( | ||
| DeletionVectorWriterOptions options) { | ||
| ICEBERG_PRECHECK(options.io != nullptr, "DeletionVectorWriter requires a FileIO"); | ||
| ICEBERG_PRECHECK(!options.path.empty(), "DeletionVectorWriter requires a path"); | ||
| return std::unique_ptr<DeletionVectorWriter>( | ||
| new DeletionVectorWriter(std::make_unique<Impl>(std::move(options)))); | ||
| } | ||
|
|
||
| Status DeletionVectorWriter::Delete(std::string_view referenced_data_file, int64_t pos) { | ||
| return impl_->Delete(referenced_data_file, pos); | ||
| } | ||
|
|
||
| Status DeletionVectorWriter::Close() { return impl_->Close(); } | ||
|
|
||
| Result<FileWriter::WriteResult> DeletionVectorWriter::Metadata() { | ||
| return impl_->Metadata(); | ||
| } | ||
|
|
||
| } // namespace iceberg | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/data/deletion_vector_writer.h | ||
| /// Writer that emits deletion vectors as `deletion-vector-v1` blobs in a Puffin file. | ||
|
|
||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <string_view> | ||
| #include <unordered_map> | ||
|
|
||
| #include "iceberg/data/writer.h" | ||
| #include "iceberg/iceberg_data_export.h" | ||
| #include "iceberg/result.h" | ||
| #include "iceberg/row/partition_values.h" | ||
| #include "iceberg/type_fwd.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| /// \brief Options for creating a DeletionVectorWriter. | ||
| struct ICEBERG_DATA_EXPORT DeletionVectorWriterOptions { | ||
| /// Output Puffin file location. | ||
| std::string path; | ||
| /// FileIO used to create the Puffin file. | ||
| std::shared_ptr<FileIO> io; | ||
| /// Partition spec the referenced data files belong to (optional). | ||
| std::shared_ptr<PartitionSpec> spec; | ||
| /// Partition the referenced data files belong to. | ||
| PartitionValues partition; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please store the partition spec and partition per referenced data file, not per writer. Java DVFileWriter passes them on each delete call, and the spec allows one Puffin file to hold DVs for files from different partitions. With this API, one writer can emit the wrong delete-file metadata. |
||
| /// File-level Puffin properties (e.g. "created-by"). | ||
| std::unordered_map<std::string, std::string> properties; | ||
| }; | ||
|
|
||
| /// \brief Writes one or more deletion vectors into a single Puffin file. | ||
| /// | ||
| /// Each referenced data file gets its own `deletion-vector-v1` blob. After | ||
| /// Close(), Metadata() returns one DataFile per blob, each carrying the | ||
| /// content_offset/content_size_in_bytes and referenced_data_file required to | ||
| /// register the deletion vector in a manifest. | ||
| /// | ||
| /// \note All referenced data files are assumed to belong to the single | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove this note once the API is fixed. It documents a limitation that Java does not have and that the spec does not require. |
||
| /// partition supplied in the options. | ||
| class ICEBERG_DATA_EXPORT DeletionVectorWriter { | ||
| public: | ||
| ~DeletionVectorWriter(); | ||
|
|
||
| DeletionVectorWriter(const DeletionVectorWriter&) = delete; | ||
| DeletionVectorWriter& operator=(const DeletionVectorWriter&) = delete; | ||
|
|
||
| /// \brief Create a new DeletionVectorWriter. | ||
| static Result<std::unique_ptr<DeletionVectorWriter>> Make( | ||
| DeletionVectorWriterOptions options); | ||
|
|
||
| /// \brief Mark a row position as deleted for the given data file. | ||
| /// | ||
| /// Positions are accumulated per data file and serialized on Close(). | ||
| Status Delete(std::string_view referenced_data_file, int64_t pos); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please make this match Java DVFileWriter more closely. Delete should take the data file spec and partition, and there should be an overload that accepts a PositionDeleteIndex. Otherwise the writer cannot represent per-file metadata or bulk deletes. |
||
|
|
||
| /// \brief Write all accumulated deletion vectors to the Puffin file and close it. | ||
| Status Close(); | ||
|
|
||
| /// \brief Metadata for the DataFiles produced (one per referenced data file). | ||
| /// \note Must be called after Close(). | ||
| Result<FileWriter::WriteResult> Metadata(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please return a delete-specific result here, not FileWriter::WriteResult. Java returns DeleteWriteResult with delete files, referenced data files, and rewritten delete files. Row delta needs those sets for conflict checks and cleanup. |
||
|
|
||
| private: | ||
| class Impl; | ||
| std::unique_ptr<Impl> impl_; | ||
|
|
||
| explicit DeletionVectorWriter(std::unique_ptr<Impl> impl); | ||
| }; | ||
|
|
||
| } // namespace iceberg | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should merge existing deletes before writing a DV for a path. The spec requires at most one DV per data file, and Java BaseDVFileWriter loads previous DVs/position deletes and reports rewritten delete files. As-is, adding another DV can leave stale delete files in table state.