Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,15 @@ set(ICEBERG_DATA_SOURCES
data/data_writer.cc
data/delete_filter.cc
data/delete_loader.cc
data/deletion_vector_writer.cc
data/equality_delete_writer.cc
data/file_scan_task_reader.cc
data/position_delete_writer.cc
data/writer.cc
deletes/position_delete_index.cc
deletes/position_delete_range_consumer.cc
deletes/roaring_position_bitmap.cc
puffin/deletion_vector.cc
puffin/file_metadata.cc
puffin/json_serde.cc
puffin/puffin_format.cc
Expand Down
45 changes: 44 additions & 1 deletion src/iceberg/data/delete_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#include "iceberg/data/delete_loader.h"

#include <cstdint>
#include <cstring>
#include <limits>
#include <span>
#include <string>
#include <utility>
#include <vector>

#include <nanoarrow/nanoarrow.h>
Expand All @@ -30,9 +33,12 @@
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/deletes/position_delete_index.h"
#include "iceberg/deletes/position_delete_range_consumer.h"
#include "iceberg/deletes/roaring_position_bitmap.h"
#include "iceberg/file_io.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/puffin/deletion_vector.h"
#include "iceberg/result.h"
#include "iceberg/row/arrow_array_wrapper.h"
#include "iceberg/schema.h"
Expand Down Expand Up @@ -171,7 +177,44 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
}

Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const {
return NotSupported("Loading deletion vectors is not yet supported");
// A deletion vector must reference exactly one data file; without it the
// caller cannot know which data file the positions apply to.
ICEBERG_PRECHECK(file.referenced_data_file.has_value(),
"Deletion vector requires referenced_data_file: {}", file.file_path);

// For deletion vectors, content_offset and content_size_in_bytes point directly
// at the DV blob bytes within the Puffin file and are required by the spec.
ICEBERG_PRECHECK(
file.content_offset.has_value() && file.content_size_in_bytes.has_value(),
"Deletion vector requires content_offset and content_size_in_bytes: {}",
file.file_path);

const int64_t offset = file.content_offset.value();
const int64_t length = file.content_size_in_bytes.value();
ICEBERG_PRECHECK(offset >= 0 && length >= 0,
"Invalid deletion vector offset/length: offset={}, length={}", offset,
length);
ICEBERG_PRECHECK(length <= std::numeric_limits<int32_t>::max(),
"Cannot read deletion vector larger than 2GB: {}", length);

ICEBERG_ASSIGN_OR_RAISE(auto input_file, io_->NewInputFile(file.file_path));
ICEBERG_ASSIGN_OR_RAISE(auto stream, input_file->Open());

std::vector<std::byte> bytes(static_cast<size_t>(length));
ICEBERG_RETURN_UNEXPECTED(stream->ReadFully(offset, bytes));
ICEBERG_RETURN_UNEXPECTED(stream->Close());

std::span<const uint8_t> blob(reinterpret_cast<const uint8_t*>(bytes.data()),
bytes.size());
ICEBERG_ASSIGN_OR_RAISE(auto bitmap, puffin::DeserializeDeletionVectorBlob(blob));

// The bitmap cardinality must match the record count recorded in metadata.
ICEBERG_PRECHECK(std::cmp_equal(bitmap.Cardinality(), file.record_count),
"Deletion vector cardinality {} does not match record count {}: {}",
bitmap.Cardinality(), file.record_count, file.file_path);

bitmap.ForEach([&index](int64_t pos) { index.Delete(pos); });
return {};
}

Result<PositionDeleteIndex> DeleteLoader::LoadPositionDeletes(
Expand Down
163 changes: 163 additions & 0 deletions src/iceberg/data/deletion_vector_writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/data/deletion_vector_writer.h"

#include <format>
#include <map>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "iceberg/deletes/roaring_position_bitmap.h"
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/partition_spec.h"
#include "iceberg/puffin/deletion_vector.h"
#include "iceberg/puffin/file_metadata.h"
#include "iceberg/puffin/puffin_writer.h"
#include "iceberg/util/macros.h"
#include "iceberg/version.h"

namespace iceberg {

class DeletionVectorWriter::Impl {
public:
explicit Impl(DeletionVectorWriterOptions options) : options_(std::move(options)) {}

Status Delete(std::string_view referenced_data_file, int64_t pos) {
ICEBERG_CHECK(!closed_, "Cannot delete after the writer is closed");
ICEBERG_PRECHECK(!referenced_data_file.empty(),
"Deletion vector requires a non-empty referenced data file");
ICEBERG_PRECHECK(pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition,
"Invalid deletion vector position: {}", pos);
bitmaps_[std::string(referenced_data_file)].Add(pos);
return {};
}

Status Close() {
if (closed_) {
return {};
}

// No deletes: skip creating an orphan Puffin file that no metadata
// references.
if (bitmaps_.empty()) {
closed_ = true;
return {};
}

auto properties = options_.properties;
properties.try_emplace(std::string(puffin::StandardPuffinProperties::kCreatedBy),
std::format("iceberg-cpp/{}.{}.{}", ICEBERG_VERSION_MAJOR,
ICEBERG_VERSION_MINOR, ICEBERG_VERSION_PATCH));

ICEBERG_ASSIGN_OR_RAISE(auto output_file, options_.io->NewOutputFile(options_.path));
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
puffin::PuffinWriter::Make(std::move(output_file), std::move(properties)));

// One blob per referenced data file, in deterministic (sorted) order.
struct Entry {
std::string referenced_data_file;
int64_t offset;
int64_t length;
int64_t cardinality;
};
std::vector<Entry> entries;
entries.reserve(bitmaps_.size());
for (auto& [referenced_data_file, bitmap] : bitmaps_) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should merge existing deletes before writing a DV for a path. The spec requires at most one DV per data file, and Java BaseDVFileWriter loads previous DVs/position deletes and reports rewritten delete files. As-is, adding another DV can leave stale delete files in table state.

bitmap.Optimize(); // run-length encode before serializing
ICEBERG_ASSIGN_OR_RAISE(
auto blob, puffin::MakeDeletionVectorBlob(bitmap, referenced_data_file));
ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata, writer->Write(blob));
entries.push_back(Entry{
.referenced_data_file = referenced_data_file,
.offset = blob_metadata.offset,
.length = blob_metadata.length,
.cardinality = static_cast<int64_t>(bitmap.Cardinality()),
});
}

ICEBERG_RETURN_UNEXPECTED(writer->Finish());
ICEBERG_ASSIGN_OR_RAISE(file_size_, writer->FileSize());

const auto spec_id =
options_.spec ? std::make_optional(options_.spec->spec_id()) : std::nullopt;

for (auto& entry : entries) {
data_files_.push_back(std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
.file_path = options_.path,
.file_format = FileFormatType::kPuffin,
.partition = options_.partition,
.record_count = entry.cardinality,
.file_size_in_bytes = file_size_,
.referenced_data_file = std::move(entry.referenced_data_file),
.content_offset = entry.offset,
.content_size_in_bytes = entry.length,
.partition_spec_id = spec_id,
}));
}

closed_ = true;
return {};
}

Result<FileWriter::WriteResult> Metadata() {
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
FileWriter::WriteResult result;
result.data_files = data_files_;
return result;
}

private:
DeletionVectorWriterOptions options_;
// Ordered by referenced data file path for deterministic blob layout.
std::map<std::string, RoaringPositionBitmap> bitmaps_;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please model the private state like Java Deletes: path, positions, spec, and partition. A map from path to bitmap loses per-file metadata, so the wrong manifest metadata can be produced.

std::vector<std::shared_ptr<DataFile>> data_files_;
int64_t file_size_ = -1;
bool closed_ = false;
};

DeletionVectorWriter::DeletionVectorWriter(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}

DeletionVectorWriter::~DeletionVectorWriter() = default;

Result<std::unique_ptr<DeletionVectorWriter>> DeletionVectorWriter::Make(
DeletionVectorWriterOptions options) {
ICEBERG_PRECHECK(options.io != nullptr, "DeletionVectorWriter requires a FileIO");
ICEBERG_PRECHECK(!options.path.empty(), "DeletionVectorWriter requires a path");
return std::unique_ptr<DeletionVectorWriter>(
new DeletionVectorWriter(std::make_unique<Impl>(std::move(options))));
}

Status DeletionVectorWriter::Delete(std::string_view referenced_data_file, int64_t pos) {
return impl_->Delete(referenced_data_file, pos);
}

Status DeletionVectorWriter::Close() { return impl_->Close(); }

Result<FileWriter::WriteResult> DeletionVectorWriter::Metadata() {
return impl_->Metadata();
}

} // namespace iceberg
92 changes: 92 additions & 0 deletions src/iceberg/data/deletion_vector_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/data/deletion_vector_writer.h
/// Writer that emits deletion vectors as `deletion-vector-v1` blobs in a Puffin file.

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>

#include "iceberg/data/writer.h"
#include "iceberg/iceberg_data_export.h"
#include "iceberg/result.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Options for creating a DeletionVectorWriter.
struct ICEBERG_DATA_EXPORT DeletionVectorWriterOptions {
/// Output Puffin file location.
std::string path;
/// FileIO used to create the Puffin file.
std::shared_ptr<FileIO> io;
/// Partition spec the referenced data files belong to (optional).
std::shared_ptr<PartitionSpec> spec;
/// Partition the referenced data files belong to.
PartitionValues partition;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please store the partition spec and partition per referenced data file, not per writer. Java DVFileWriter passes them on each delete call, and the spec allows one Puffin file to hold DVs for files from different partitions. With this API, one writer can emit the wrong delete-file metadata.

/// File-level Puffin properties (e.g. "created-by").
std::unordered_map<std::string, std::string> properties;
};

/// \brief Writes one or more deletion vectors into a single Puffin file.
///
/// Each referenced data file gets its own `deletion-vector-v1` blob. After
/// Close(), Metadata() returns one DataFile per blob, each carrying the
/// content_offset/content_size_in_bytes and referenced_data_file required to
/// register the deletion vector in a manifest.
///
/// \note All referenced data files are assumed to belong to the single

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this note once the API is fixed. It documents a limitation that Java does not have and that the spec does not require.

/// partition supplied in the options.
class ICEBERG_DATA_EXPORT DeletionVectorWriter {
public:
~DeletionVectorWriter();

DeletionVectorWriter(const DeletionVectorWriter&) = delete;
DeletionVectorWriter& operator=(const DeletionVectorWriter&) = delete;

/// \brief Create a new DeletionVectorWriter.
static Result<std::unique_ptr<DeletionVectorWriter>> Make(
DeletionVectorWriterOptions options);

/// \brief Mark a row position as deleted for the given data file.
///
/// Positions are accumulated per data file and serialized on Close().
Status Delete(std::string_view referenced_data_file, int64_t pos);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this match Java DVFileWriter more closely. Delete should take the data file spec and partition, and there should be an overload that accepts a PositionDeleteIndex. Otherwise the writer cannot represent per-file metadata or bulk deletes.


/// \brief Write all accumulated deletion vectors to the Puffin file and close it.
Status Close();

/// \brief Metadata for the DataFiles produced (one per referenced data file).
/// \note Must be called after Close().
Result<FileWriter::WriteResult> Metadata();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return a delete-specific result here, not FileWriter::WriteResult. Java returns DeleteWriteResult with delete files, referenced data files, and rewritten delete files. Row delta needs those sets for conflict checks and cleanup.


private:
class Impl;
std::unique_ptr<Impl> impl_;

explicit DeletionVectorWriter(std::unique_ptr<Impl> impl);
};

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/data/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ install_headers(
'data_writer.h',
'delete_filter.h',
'delete_loader.h',
'deletion_vector_writer.h',
'equality_delete_writer.h',
'file_scan_task_reader.h',
'position_delete_writer.h',
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,15 @@ iceberg_data_sources = files(
'data/data_writer.cc',
'data/delete_filter.cc',
'data/delete_loader.cc',
'data/deletion_vector_writer.cc',
'data/equality_delete_writer.cc',
'data/file_scan_task_reader.cc',
'data/position_delete_writer.cc',
'data/writer.cc',
'deletes/position_delete_index.cc',
'deletes/position_delete_range_consumer.cc',
'deletes/roaring_position_bitmap.cc',
'puffin/deletion_vector.cc',
'puffin/file_metadata.cc',
'puffin/json_serde.cc',
'puffin/puffin_format.cc',
Expand Down
Loading
Loading