From 543b9970a4282c6ef72512fac20564622b8920a5 Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Tue, 23 Jun 2026 23:44:45 +0800 Subject: [PATCH] feat: add rewrite manifests --- src/iceberg/CMakeLists.txt | 1 + .../manifest/rolling_manifest_writer.cc | 7 + .../manifest/rolling_manifest_writer.h | 3 + src/iceberg/meson.build | 1 + src/iceberg/table.cc | 11 + src/iceberg/table.h | 5 + src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/rewrite_manifests_test.cc | 649 ++++++++++++++++++ src/iceberg/transaction.cc | 8 + src/iceberg/transaction.h | 3 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.cc | 12 +- src/iceberg/update/pending_update.h | 3 +- src/iceberg/update/rewrite_manifests.cc | 533 ++++++++++++++ src/iceberg/update/rewrite_manifests.h | 153 +++++ 16 files changed, 1389 insertions(+), 3 deletions(-) create mode 100644 src/iceberg/test/rewrite_manifests_test.cc create mode 100644 src/iceberg/update/rewrite_manifests.cc create mode 100644 src/iceberg/update/rewrite_manifests.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 1bf1f10db..f862c8974 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -108,6 +108,7 @@ set(ICEBERG_SOURCES update/merging_snapshot_update.cc update/overwrite_files.cc update/pending_update.cc + update/rewrite_manifests.cc update/row_delta.cc update/set_snapshot.cc update/snapshot_manager.cc diff --git a/src/iceberg/manifest/rolling_manifest_writer.cc b/src/iceberg/manifest/rolling_manifest_writer.cc index 1648ca86a..a2d651e74 100644 --- a/src/iceberg/manifest/rolling_manifest_writer.cc +++ b/src/iceberg/manifest/rolling_manifest_writer.cc @@ -54,6 +54,13 @@ Status RollingManifestWriter::WriteExistingEntry( return {}; } +Status RollingManifestWriter::WriteExistingEntry(const ManifestEntry& entry) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + current_file_rows_++; + return {}; +} + Status RollingManifestWriter::WriteDeletedEntry( std::shared_ptr file, int64_t data_sequence_number, std::optional file_sequence_number) { diff --git a/src/iceberg/manifest/rolling_manifest_writer.h b/src/iceberg/manifest/rolling_manifest_writer.h index 6e0211f55..ed0331832 100644 --- a/src/iceberg/manifest/rolling_manifest_writer.h +++ b/src/iceberg/manifest/rolling_manifest_writer.h @@ -76,6 +76,9 @@ class ICEBERG_EXPORT RollingManifestWriter { int64_t data_sequence_number, std::optional file_sequence_number = std::nullopt); + /// \brief Add an existing entry while preserving snapshot and sequence fields. + Status WriteExistingEntry(const ManifestEntry& entry); + /// \brief Add a delete entry for a file. /// /// \param file a deleted data file diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 71a4498f1..8325b670e 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -133,6 +133,7 @@ iceberg_sources = files( 'update/merging_snapshot_update.cc', 'update/overwrite_files.cc', 'update/pending_update.cc', + 'update/rewrite_manifests.cc', 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 9dbc5acf7..2b2440f2f 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" +#include "iceberg/update/rewrite_manifests.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -245,6 +246,12 @@ Result> Table::NewOverwrite() { return OverwriteFiles::Make(name().name, std::move(ctx)); } +Result> Table::NewRewriteManifests() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return RewriteManifests::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -360,6 +367,10 @@ Result> StaticTable::NewOverwrite() { return NotSupported("Cannot create an overwrite for a static table"); } +Result> StaticTable::NewRewriteManifests() { + return NotSupported("Cannot create a rewrite manifests for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 64ed21ef8..e777e4ca8 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -188,6 +188,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. virtual Result> NewOverwrite(); + /// \brief Create a new RewriteManifests to rewrite manifest layout. + virtual Result> NewRewriteManifests(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -263,6 +266,8 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewOverwrite() override; + Result> NewRewriteManifests() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fcbc22126..8bd47476c 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -234,6 +234,7 @@ if(ICEBERG_BUILD_BUNDLE) merge_append_test.cc merging_snapshot_update_test.cc name_mapping_update_test.cc + rewrite_manifests_test.cc row_delta_test.cc snapshot_manager_test.cc transaction_test.cc diff --git a/src/iceberg/test/rewrite_manifests_test.cc b/src/iceberg/test/rewrite_manifests_test.cc new file mode 100644 index 000000000..a9a219f63 --- /dev/null +++ b/src/iceberg/test/rewrite_manifests_test.cc @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_manifests.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +class RewriteManifestsTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + std::string MetadataResource() const override { + return "TableMetadataV2ValidMinimal.json"; + } + + void SetUp() override { + UpdateTestBase::SetUp(); + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + file_a_ = MakeDataFile("a", 10, 100, 1); + file_b_ = MakeDataFile("b", 20, 200, 2); + file_c_ = MakeDataFile("c", 30, 300, 3); + file_d_ = MakeDataFile("d", 40, 400, 4); + } + + std::shared_ptr MakeDataFile(const std::string& name, int64_t record_count, + int64_t size, int64_t partition_value) { + auto file = std::make_shared(); + file->content = DataFile::Content::kData; + file->file_path = std::format("{}/data/{}.parquet", table_location_, name); + file->file_format = FileFormatType::kParquet; + file->partition = + PartitionValues(std::vector{Literal::Long(partition_value)}); + file->record_count = record_count; + file->file_size_in_bytes = size; + file->partition_spec_id = spec_->spec_id(); + return file; + } + + Status AppendFiles(std::vector> files) { + ICEBERG_ASSIGN_OR_RAISE(auto append, table_->NewFastAppend()); + for (const auto& file : files) { + append->AppendFile(file); + } + ICEBERG_RETURN_UNEXPECTED(append->Commit()); + return table_->Refresh(); + } + + Result> CurrentManifests() { + ICEBERG_RETURN_UNEXPECTED(table_->Refresh()); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table_->current_snapshot()); + SnapshotCache cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, cache.DataManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + Result> Entries(const ManifestFile& manifest) { + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema_, spec_)); + return reader->Entries(); + } + + Result WriteExistingManifest( + const std::string& name, int64_t snapshot_id, + const std::vector>& files) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + kInvalidSnapshotId, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + file, snapshot_id, TableMetadata::kInitialSequenceNumber)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + Result WriteAddedManifest( + const std::string& name, const std::vector>& files) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + kInvalidSnapshotId, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(file)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + Result WriteDeletedManifest( + const std::string& name, const std::vector>& files) { + auto path = std::format("{}/metadata/{}.avro", table_location_, name); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, ManifestWriter::MakeWriter(table_->metadata()->format_version, + kInvalidSnapshotId, path, file_io_, spec_, + schema_, ManifestContent::kData)); + for (const auto& file : files) { + ICEBERG_RETURN_UNEXPECTED( + writer->WriteDeletedEntry(file, TableMetadata::kInitialSequenceNumber)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + void ExpectEntryPathsAndStatuses(const ManifestFile& manifest, + std::vector expected_paths, + std::vector expected_statuses) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + ASSERT_EQ(entries.size(), expected_paths.size()); + std::vector> actual; + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, entry.status); + } + std::vector> expected; + for (size_t i = 0; i < expected_paths.size(); ++i) { + expected.emplace_back(std::move(expected_paths[i]), expected_statuses[i]); + } + std::ranges::sort(actual); + std::ranges::sort(expected); + EXPECT_EQ(actual, expected); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; + std::shared_ptr file_c_; + std::shared_ptr file_d_; +}; + +TEST_F(RewriteManifestsTest, RewriteManifestsAppendedDirectly) { + ICEBERG_UNWRAP_OR_FAIL(auto new_manifest, + WriteAddedManifest("manifest-file-1", {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + append->AppendManifest(new_manifest); + EXPECT_THAT(append->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return ""; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifests[0])); + ASSERT_EQ(entries.size(), 1U); + EXPECT_EQ(entries[0].snapshot_id, append_snapshot_id); + ASSERT_NE(entries[0].data_file, nullptr); + EXPECT_EQ(entries[0].data_file->file_path, file_a_->file_path); + EXPECT_EQ(entries[0].status, ManifestStatus::kExisting); +} + +TEST_F(RewriteManifestsTest, RewriteManifestsGeneratedAndAppendedDirectly) { + ICEBERG_UNWRAP_OR_FAIL(auto new_manifest, + WriteAddedManifest("manifest-file-1", {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto append_manifest, table_->NewFastAppend()); + append_manifest->AppendManifest(new_manifest); + EXPECT_THAT(append_manifest->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto manifest_append_snapshot, table_->current_snapshot()); + const int64_t manifest_append_id = manifest_append_snapshot->snapshot_id; + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto file_append_snapshot, table_->current_snapshot()); + const int64_t file_append_id = file_append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return ""; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifests[0])); + ASSERT_EQ(entries.size(), 2U); + std::vector> actual; + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + EXPECT_EQ(entry.status, ManifestStatus::kExisting); + actual.emplace_back(entry.data_file->file_path, entry.snapshot_id); + } + std::ranges::sort(actual); + EXPECT_EQ(actual, (std::vector>{ + {file_a_->file_path, manifest_append_id}, + {file_b_->file_path, file_append_id}})); +} + +TEST_F(RewriteManifestsTest, ReplaceManifestsConsolidate) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "all"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + ExpectEntryPathsAndStatuses(manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsCreated), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsKept), "0"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsReplaced), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kEntriesProcessed), "2"); +} + +TEST_F(RewriteManifestsTest, ReplaceManifestsSeparate) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile& file) { return file.file_path; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + std::vector paths; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + ASSERT_EQ(entries.size(), 1U); + EXPECT_EQ(entries[0].status, ManifestStatus::kExisting); + paths.push_back(entries[0].data_file->file_path); + } + std::ranges::sort(paths); + EXPECT_EQ(paths, (std::vector{file_a_->file_path, file_b_->file_path})); +} + +TEST_F(RewriteManifestsTest, ReplaceManifestsMaxSize) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()), "1"); + EXPECT_THAT(props->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + + std::vector> actual; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + ASSERT_EQ(entries.size(), 1U); + EXPECT_EQ(entries[0].status, ManifestStatus::kExisting); + ASSERT_NE(entries[0].data_file, nullptr); + actual.emplace_back(entries[0].data_file->file_path, entries[0].snapshot_id); + } + std::ranges::sort(actual); + EXPECT_EQ(actual, (std::vector>{ + {file_a_->file_path, append_snapshot_id}, + {file_b_->file_path, append_snapshot_id}})); +} + +TEST_F(RewriteManifestsTest, ReplaceManifestsWithFilter) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "bc"; }); + rewrite->RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = Entries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_a_->file_path; + }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + ExpectEntryPathsAndStatuses(manifests[0], {file_b_->file_path, file_c_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}); + ExpectEntryPathsAndStatuses(manifests[1], {file_a_->file_path}, + {ManifestStatus::kAdded}); +} + +TEST_F(RewriteManifestsTest, BasicManifestReplacement) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot_manifests, CurrentManifests()); + ASSERT_EQ(first_snapshot_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_c_, file_d_}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, WriteExistingManifest("rewrite-a", first_snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, WriteExistingManifest("rewrite-b", first_snapshot_id, {file_b_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_snapshot_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + ExpectEntryPathsAndStatuses(manifests[0], {file_a_->file_path}, + {ManifestStatus::kExisting}); + ExpectEntryPathsAndStatuses(manifests[1], {file_b_->file_path}, + {ManifestStatus::kExisting}); + ExpectEntryPathsAndStatuses(manifests[2], {file_c_->file_path, file_d_->file_path}, + {ManifestStatus::kAdded, ManifestStatus::kAdded}); + + ICEBERG_UNWRAP_OR_FAIL(auto after, table_->current_snapshot()); + EXPECT_EQ(after->summary.at(SnapshotSummaryFields::kManifestsCreated), "2"); + EXPECT_EQ(after->summary.at(SnapshotSummaryFields::kManifestsKept), "1"); + EXPECT_EQ(after->summary.at(SnapshotSummaryFields::kManifestsReplaced), "1"); + EXPECT_EQ(after->summary.at(SnapshotSummaryFields::kEntriesProcessed), "0"); +} + +TEST_F(RewriteManifestsTest, RejectsReplacementWithDifferentActiveFileCount) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto old_manifests, CurrentManifests()); + ASSERT_EQ(old_manifests.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("rewrite-only-a", snapshot->snapshot_id, {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(old_manifests[0]).AddManifest(manifest_a); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, + HasErrorMessage("Replaced and created manifests must have the same")); +} + +TEST_F(RewriteManifestsTest, RejectsReplacementWithDifferentActiveFiles) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto old_manifests, CurrentManifests()); + ASSERT_EQ(old_manifests.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("rewrite-same-count-a", snapshot->snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_c, + WriteExistingManifest("rewrite-same-count-c", snapshot->snapshot_id, {file_c_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(old_manifests[0]) + .AddManifest(manifest_a) + .AddManifest(manifest_c); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("same active files")); +} + +TEST_F(RewriteManifestsTest, ReplacementAllowsMissingFileCounts) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto old_manifests, CurrentManifests()); + ASSERT_EQ(old_manifests.size(), 1U); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + + auto old_manifest_without_counts = old_manifests[0]; + old_manifest_without_counts.added_files_count = std::nullopt; + old_manifest_without_counts.existing_files_count = std::nullopt; + + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_a, + WriteExistingManifest("rewrite-missing-count-a", snapshot->snapshot_id, {file_a_})); + ICEBERG_UNWRAP_OR_FAIL( + auto manifest_b, + WriteExistingManifest("rewrite-missing-count-b", snapshot->snapshot_id, {file_b_})); + manifest_a.added_files_count = std::nullopt; + manifest_a.existing_files_count = std::nullopt; + manifest_b.added_files_count = std::nullopt; + manifest_b.existing_files_count = std::nullopt; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(old_manifest_without_counts) + .AddManifest(manifest_a) + .AddManifest(manifest_b); + EXPECT_THAT(rewrite->Commit(), IsOk()); +} + +TEST_F(RewriteManifestsTest, AppendDuringRewriteManifest) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "all"; }); + EXPECT_THAT(static_cast(*rewrite).Apply(), IsOk()); + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + std::vector> actual; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, entry.status); + } + } + std::ranges::sort(actual); + EXPECT_EQ(actual, (std::vector>{ + {file_a_->file_path, ManifestStatus::kExisting}, + {file_b_->file_path, ManifestStatus::kAdded}})); +} + +TEST_F(RewriteManifestsTest, RewriteManifestDuringAppend) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto append_snapshot, table_->current_snapshot()); + const int64_t append_snapshot_id = append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + append->AppendFile(file_b_); + EXPECT_THAT(static_cast(*append).Apply(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto rewritten_manifests, CurrentManifests()); + ASSERT_EQ(rewritten_manifests.size(), 1U); + + EXPECT_THAT(append->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto file_append_snapshot, table_->current_snapshot()); + const int64_t file_append_id = file_append_snapshot->snapshot_id; + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 2U); + + std::vector>> actual; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, + std::make_pair(entry.status, entry.snapshot_id)); + } + } + std::ranges::sort(actual); + EXPECT_EQ(actual, + (std::vector>>{ + {file_a_->file_path, {ManifestStatus::kExisting, append_snapshot_id}}, + {file_b_->file_path, {ManifestStatus::kAdded, file_append_id}}})); +} + +TEST_F(RewriteManifestsTest, RejectsAddManifestWithAddedFiles) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto old_manifests, CurrentManifests()); + ASSERT_EQ(old_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto invalid_added_manifest, + WriteAddedManifest("invalid-added", {file_a_})); + invalid_added_manifest.added_files_count = std::nullopt; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(old_manifests[0]).AddManifest(invalid_added_manifest); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot add manifest with added files")); +} + +TEST_F(RewriteManifestsTest, RejectsAddManifestWithDeletedFiles) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto old_manifests, CurrentManifests()); + ASSERT_EQ(old_manifests.size(), 1U); + + ICEBERG_UNWRAP_OR_FAIL(auto invalid_deleted_manifest, + WriteDeletedManifest("invalid-deleted", {file_a_})); + invalid_deleted_manifest.deleted_files_count = std::nullopt; + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(old_manifests[0]).AddManifest(invalid_deleted_manifest); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot add manifest with deleted files")); +} + +TEST_F(RewriteManifestsTest, FailsWhenDeletedManifestMissingFromSnapshot) { + ASSERT_THAT(AppendFiles({file_a_, file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + + // A manifest that was never part of the table's current snapshot. + ICEBERG_UNWRAP_OR_FAIL( + auto orphan, WriteExistingManifest("orphan", snapshot->snapshot_id, {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(orphan); + auto result = rewrite->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("could not be found in the latest snapshot")); +} + +TEST_F(RewriteManifestsTest, ConcurrentRewriteManifest) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto before, CurrentManifests()); + ASSERT_EQ(before.size(), 2U); + + // Start a rewrite that processes both manifests, but only apply (stage) it. + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->ClusterBy([](const DataFile&) { return "file"; }); + ICEBERG_UNWRAP_OR_FAIL(auto staged, static_cast(*rewrite).Apply()); + SnapshotCache staged_cache(staged.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto staged_manifests, staged_cache.DataManifests(file_io_)); + ASSERT_EQ(staged_manifests.size(), 1U); + const std::string staged_manifest_path = staged_manifests[0].manifest_path; + + // Concurrently rewrite only the manifest that does not contain file_a. + ICEBERG_UNWRAP_OR_FAIL(auto concurrent, table_->NewRewriteManifests()); + concurrent->ClusterBy([](const DataFile&) { return "file"; }); + concurrent->RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = Entries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_a_->file_path; + }); + EXPECT_THAT(concurrent->Commit(), IsOk()); + ASSERT_THAT(table_->Refresh(), IsOk()); + + // Committing the in-progress rewrite must perform a full rewrite because the + // manifest with file_b is no longer part of the current snapshot. + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 1U); + EXPECT_NE(manifests[0].manifest_path, staged_manifest_path); + ExpectEntryPathsAndStatuses(manifests[0], {file_a_->file_path, file_b_->file_path}, + {ManifestStatus::kExisting, ManifestStatus::kExisting}); +} + +TEST_F(RewriteManifestsTest, CombinesManifestReplacementWithRewrite) { + ASSERT_THAT(AppendFiles({file_a_}), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + const int64_t first_snapshot_id = first_snapshot->snapshot_id; + ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, CurrentManifests()); + ASSERT_EQ(first_manifests.size(), 1U); + + ASSERT_THAT(AppendFiles({file_b_}), IsOk()); + ASSERT_THAT(AppendFiles({file_c_}), IsOk()); + + // A manifest that re-adds file_a as an existing file. + ICEBERG_UNWRAP_OR_FAIL( + auto new_manifest, + WriteExistingManifest("combined-a", first_snapshot_id, {file_a_})); + + ICEBERG_UNWRAP_OR_FAIL(auto rewrite, table_->NewRewriteManifests()); + rewrite->DeleteManifest(first_manifests[0]) + .AddManifest(new_manifest) + .ClusterBy([](const DataFile&) { return "const-value"; }) + .RewriteIf([&](const ManifestFile& manifest) { + auto entries_result = Entries(manifest); + if (!entries_result.has_value() || entries_result.value().empty()) { + return false; + } + return entries_result.value()[0].data_file->file_path != file_b_->file_path; + }); + EXPECT_THAT(rewrite->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentManifests()); + ASSERT_EQ(manifests.size(), 3U); + + // Collect the file path / status pairs across all resulting manifests. + std::vector> actual; + for (const auto& manifest : manifests) { + ICEBERG_UNWRAP_OR_FAIL(auto entries, Entries(manifest)); + for (const auto& entry : entries) { + ASSERT_NE(entry.data_file, nullptr); + actual.emplace_back(entry.data_file->file_path, entry.status); + } + } + std::ranges::sort(actual); + EXPECT_EQ(actual, (std::vector>{ + {file_a_->file_path, ManifestStatus::kExisting}, + {file_b_->file_path, ManifestStatus::kAdded}, + {file_c_->file_path, ManifestStatus::kExisting}})); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsCreated), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsKept), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kManifestsReplaced), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kEntriesProcessed), "1"); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 169e7ec90..df4501e0f 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -38,6 +38,7 @@ #include "iceberg/update/merge_append.h" #include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/rewrite_manifests.h" #include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -521,6 +522,13 @@ Result> Transaction::NewOverwrite() { return overwrite; } +Result> Transaction::NewRewriteManifests() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr rewrite_manifests, + RewriteManifests::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(rewrite_manifests)); + return rewrite_manifests; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 49b607d60..655be3e16 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -118,6 +118,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewOverwrite(); + /// \brief Create a new RewriteManifests to rewrite manifest layout. + Result> NewRewriteManifests(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 784b3e03b..3f0b3c271 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -244,6 +244,7 @@ class FastAppend; class MergeAppend; class OverwriteFiles; class PendingUpdate; +class RewriteManifests; class RowDelta; class SetSnapshot; class SnapshotManager; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4ba4168d4..6a455dbd7 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -24,6 +24,7 @@ install_headers( 'merging_snapshot_update.h', 'overwrite_files.h', 'pending_update.h', + 'rewrite_manifests.h', 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc index 4b3000652..eaccbc596 100644 --- a/src/iceberg/update/pending_update.cc +++ b/src/iceberg/update/pending_update.cc @@ -35,6 +35,10 @@ Status PendingUpdate::Commit() { if (!ctx_->transaction) { // Table-created path: no transaction exists yet, create a temporary one. ICEBERG_ASSIGN_OR_RAISE(auto txn, Transaction::Make(ctx_)); + auto self = weak_from_this().lock(); + if (self) { + ICEBERG_RETURN_UNEXPECTED(txn->AddUpdate(self)); + } auto apply_status = txn->Apply(*this); if (!apply_status.has_value()) { std::ignore = Finalize(std::unexpected(apply_status.error())); @@ -43,11 +47,15 @@ Status PendingUpdate::Commit() { auto commit_result = txn->Commit(); if (!commit_result.has_value()) { - std::ignore = Finalize(std::unexpected(commit_result.error())); + if (!self) { + std::ignore = Finalize(std::unexpected(commit_result.error())); + } return std::unexpected(commit_result.error()); } - std::ignore = Finalize(commit_result.value()->metadata().get()); + if (!self) { + std::ignore = Finalize(commit_result.value()->metadata().get()); + } return {}; } auto txn = ctx_->transaction->lock(); diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 19998ddb3..5e3415160 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -38,7 +38,8 @@ namespace iceberg { /// /// \note Implementations are expected to use builder pattern and errors /// should be handled by the ErrorCollector base class. -class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { +class ICEBERG_EXPORT PendingUpdate : public ErrorCollector, + public std::enable_shared_from_this { public: enum class Kind : uint8_t { kExpireSnapshots, diff --git a/src/iceberg/update/rewrite_manifests.cc b/src/iceberg/update/rewrite_manifests.cc new file mode 100644 index 000000000..b68ac476e --- /dev/null +++ b/src/iceberg/update/rewrite_manifests.cc @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/rewrite_manifests.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/constants.h" +#include "iceberg/inheritable_metadata.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/manifest/rolling_manifest_writer.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" // IWYU pragma: keep +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +void SetSnapshotId(ManifestFile& manifest, int64_t snapshot_id) { + manifest.added_snapshot_id = snapshot_id; +} + +} // namespace + +Result> RewriteManifests::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create RewriteManifests without a context"); + return std::unique_ptr( + new RewriteManifests(std::move(table_name), std::move(ctx))); +} + +RewriteManifests::RewriteManifests(std::string table_name, + std::shared_ptr ctx) + : SnapshotUpdate(std::move(ctx)), table_name_(std::move(table_name)) {} + +RewriteManifests& RewriteManifests::ClusterBy(ClusterByFunc func) { + ICEBERG_BUILDER_CHECK(static_cast(func), "Cluster function cannot be null"); + cluster_by_func_ = std::move(func); + return *this; +} + +RewriteManifests& RewriteManifests::RewriteIf(RewritePredicate predicate) { + ICEBERG_BUILDER_CHECK(static_cast(predicate), "Rewrite predicate cannot be null"); + predicate_ = std::move(predicate); + return *this; +} + +RewriteManifests& RewriteManifests::DeleteManifest(const ManifestFile& manifest) { + auto [_, inserted] = deleted_manifest_paths_.insert(manifest.manifest_path); + if (inserted) { + deleted_manifests_.push_back(manifest); + } + return *this; +} + +RewriteManifests& RewriteManifests::AddManifest(const ManifestFile& manifest) { + if (manifest.added_files_count.has_value()) { + ICEBERG_BUILDER_CHECK(!manifest.has_added_files(), + "Cannot add manifest with added files"); + } + if (manifest.deleted_files_count.has_value()) { + ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(), + "Cannot add manifest with deleted files"); + } + ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId, + "Snapshot id must be assigned during commit"); + ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber, + "Sequence number must be assigned during commit"); + + if (can_inherit_snapshot_id()) { + added_manifests_.push_back(manifest); + } else { + // The manifest must be rewritten with this update's snapshot ID. CopyManifest + // also validates that the manifest only contains existing entries. + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest)); + rewritten_added_manifests_.push_back(std::move(copied_manifest)); + } + return *this; +} + +std::string RewriteManifests::operation() { return DataOperation::kReplace; } + +Result> RewriteManifests::Apply( + const TableMetadata& metadata_to_update, const std::shared_ptr& snapshot) { + ICEBERG_PRECHECK(snapshot != nullptr, + "Cannot rewrite manifests without a current snapshot"); + + SnapshotCache cached_snapshot(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto current_manifests, + cached_snapshot.Manifests(ctx_->table->io())); + + std::unordered_set current_manifest_paths; + current_manifest_paths.reserve(current_manifests.size()); + for (const auto& manifest : current_manifests) { + current_manifest_paths.insert(manifest.manifest_path); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateDeletedManifests(current_manifest_paths, snapshot->snapshot_id)); + + if (RequiresRewrite(current_manifest_paths)) { + ICEBERG_ASSIGN_OR_RAISE(auto rewritten, + Rewrite(metadata_to_update, current_manifests)); + new_manifests_ = std::move(rewritten); + } else { + // Keep any existing manifests as-is that were not processed. Previously + // created manifests in new_manifests_ are reused across commit retries. + kept_manifests_.clear(); + for (const auto& manifest : current_manifests) { + if (!rewritten_manifest_paths_.contains(manifest.manifest_path) && + !deleted_manifest_paths_.contains(manifest.manifest_path)) { + kept_manifests_.push_back(manifest); + } + } + } + + ICEBERG_RETURN_UNEXPECTED(ValidateAddedManifests()); + ICEBERG_RETURN_UNEXPECTED(ValidateActiveFiles()); + + std::vector manifests; + manifests.reserve(new_manifests_.size() + added_manifests_.size() + + rewritten_added_manifests_.size() + kept_manifests_.size()); + + const int64_t snapshot_id = SnapshotId(); + for (auto& manifest : new_manifests_) { + SetSnapshotId(manifest, snapshot_id); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_with_counts, FillMissingCounts(manifest)); + manifests.push_back(std::move(manifest_with_counts)); + } + for (auto& manifest : added_manifests_) { + SetSnapshotId(manifest, snapshot_id); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_with_counts, FillMissingCounts(manifest)); + manifests.push_back(std::move(manifest_with_counts)); + } + for (auto& manifest : rewritten_added_manifests_) { + SetSnapshotId(manifest, snapshot_id); + ICEBERG_ASSIGN_OR_RAISE(auto manifest_with_counts, FillMissingCounts(manifest)); + manifests.push_back(std::move(manifest_with_counts)); + } + for (const auto& manifest : kept_manifests_) { + ICEBERG_ASSIGN_OR_RAISE(auto manifest_with_counts, FillMissingCounts(manifest)); + manifests.push_back(std::move(manifest_with_counts)); + } + + manifest_count_summary_ = BuildManifestCountSummary( + manifests, + static_cast(rewritten_manifests_.size() + deleted_manifests_.size())); + return manifests; +} + +std::unordered_map RewriteManifests::Summary() { + summary_.Clear(); + summary_.SetPartitionSummaryLimit(0); + for (const auto& [property, value] : custom_summary_properties_) { + summary_.Set(property, value); + } + summary_.Merge(manifest_count_summary_); + summary_.Set(SnapshotSummaryFields::kEntriesProcessed, std::to_string(entry_count_)); + return summary_.Build(); +} + +void RewriteManifests::SetSummaryProperty(const std::string& property, + const std::string& value) { + custom_summary_properties_[property] = value; + SnapshotUpdate::SetSummaryProperty(property, value); +} + +Status RewriteManifests::CleanUncommitted( + const std::unordered_set& committed) { + if (committed.empty() && !cleanup_all_) { + return {}; + } + ICEBERG_RETURN_UNEXPECTED(DeleteUncommitted(new_manifests_, committed, + /*clear=*/false)); + ICEBERG_RETURN_UNEXPECTED(DeleteUncommitted(rewritten_added_manifests_, committed, + /*clear=*/false)); + return {}; +} + +Status RewriteManifests::Finalize(Result commit_result) { + if (!commit_result.has_value() && + commit_result.error().kind != ErrorKind::kCommitStateUnknown) { + cleanup_all_ = true; + } + auto status = SnapshotUpdate::Finalize(std::move(commit_result)); + cleanup_all_ = false; + return status; +} + +bool RewriteManifests::RequiresRewrite( + const std::unordered_set& current_manifest_paths) const { + if (!cluster_by_func_) { + // manifests are deleted and added directly so don't perform a rewrite + return false; + } + if (rewritten_manifests_.empty()) { + // nothing yet processed so perform a full rewrite + return true; + } + + // if any processed manifest is not in the current manifest list, perform a full rewrite + return std::ranges::any_of(rewritten_manifests_, [&](const ManifestFile& manifest) { + return !current_manifest_paths.contains(manifest.manifest_path); + }); +} + +bool RewriteManifests::MatchesPredicate(const ManifestFile& manifest) const { + return !predicate_ || predicate_(manifest); +} + +Status RewriteManifests::ValidateDeletedManifests( + const std::unordered_set& current_manifest_paths, + int64_t current_snapshot_id) const { + for (const auto& manifest : deleted_manifests_) { + if (!current_manifest_paths.contains(manifest.manifest_path)) { + return ValidationFailed( + "Deleted manifest {} could not be found in the latest snapshot {}", + manifest.manifest_path, current_snapshot_id); + } + } + return {}; +} + +Status RewriteManifests::ValidateAddedManifests() const { + // Manifests added via the inherit path are kept as-is, so their entries must be + // verified here because ManifestFile counts may be absent. Manifests on the copy + // path are already validated while being copied in CopyManifest. + for (const auto& manifest : added_manifests_) { + ICEBERG_ASSIGN_OR_RAISE(auto entries, Entries(manifest)); + for (const auto& entry : entries) { + if (entry.status == ManifestStatus::kAdded) { + return ValidationFailed("Cannot add manifest with added files"); + } + if (entry.status == ManifestStatus::kDeleted) { + return ValidationFailed("Cannot add manifest with deleted files"); + } + } + } + return {}; +} + +Status RewriteManifests::ValidateActiveFiles() const { + std::vector created; + created.reserve(new_manifests_.size() + added_manifests_.size() + + rewritten_added_manifests_.size()); + created.insert(created.end(), new_manifests_.begin(), new_manifests_.end()); + created.insert(created.end(), added_manifests_.begin(), added_manifests_.end()); + created.insert(created.end(), rewritten_added_manifests_.begin(), + rewritten_added_manifests_.end()); + + std::vector replaced; + replaced.reserve(rewritten_manifests_.size() + deleted_manifests_.size()); + replaced.insert(replaced.end(), rewritten_manifests_.begin(), + rewritten_manifests_.end()); + replaced.insert(replaced.end(), deleted_manifests_.begin(), deleted_manifests_.end()); + + ICEBERG_ASSIGN_OR_RAISE(auto created_files, ActiveFiles(created)); + ICEBERG_ASSIGN_OR_RAISE(auto replaced_files, ActiveFiles(replaced)); + if (created_files.size() != replaced_files.size()) { + return ValidationFailed( + "Replaced and created manifests must have the same number of active files: {} " + "(new), {} (old)", + created_files.size(), replaced_files.size()); + } + + // Group created files by path so each replaced file can be matched in amortized + // constant time instead of scanning the whole list. + std::unordered_multimap created_by_path; + created_by_path.reserve(created_files.size()); + for (auto& file : created_files) { + created_by_path.emplace(file.file_path, std::move(file)); + } + + for (const auto& file : replaced_files) { + auto [begin, end] = created_by_path.equal_range(file.file_path); + auto match = + std::find_if(begin, end, [&](const auto& entry) { return entry.second == file; }); + if (match == end) { + return ValidationFailed( + "Replaced and created manifests must have the same active files"); + } + created_by_path.erase(match); + } + return {}; +} + +Result> RewriteManifests::Entries( + const ManifestFile& manifest) const { + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(manifest.partition_spec_id)); + + if (manifest.added_snapshot_id != kInvalidSnapshotId) { + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ManifestReader::Make(manifest, ctx_->table->io(), schema, spec)); + return reader->Entries(); + } + + ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata, + InheritableMetadataFactory::ForCopy(/*snapshot_id=*/0)); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, + ManifestReader::Make(manifest.manifest_path, manifest.manifest_length, + ctx_->table->io(), schema, spec, + std::move(inheritable_metadata), manifest.first_row_id, + /*is_committed=*/false)); + return reader->Entries(); +} + +Result> RewriteManifests::ActiveFiles( + const std::vector& manifests) const { + std::vector active_files; + for (const auto& manifest : manifests) { + ICEBERG_ASSIGN_OR_RAISE(auto entries, Entries(manifest)); + active_files.reserve(active_files.size() + entries.size()); + for (const auto& entry : entries) { + if (!entry.IsAlive()) { + continue; + } + ICEBERG_PRECHECK(entry.data_file != nullptr, + "Manifest entry in {} is missing data_file", + manifest.manifest_path); + auto file = *entry.data_file; + file.partition_spec_id = manifest.partition_spec_id; + active_files.push_back(std::move(file)); + } + } + return active_files; +} + +Result RewriteManifests::FillMissingCounts( + const ManifestFile& manifest) const { + if (manifest.added_files_count.has_value() && + manifest.existing_files_count.has_value() && + manifest.deleted_files_count.has_value() && manifest.added_rows_count.has_value() && + manifest.existing_rows_count.has_value() && + manifest.deleted_rows_count.has_value()) { + return manifest; + } + + ICEBERG_ASSIGN_OR_RAISE(auto entries, Entries(manifest)); + ManifestFile manifest_with_counts = manifest; + int32_t added_files = 0; + int64_t added_rows = 0; + int32_t existing_files = 0; + int64_t existing_rows = 0; + int32_t deleted_files = 0; + int64_t deleted_rows = 0; + + for (const auto& entry : entries) { + ICEBERG_PRECHECK(entry.data_file != nullptr, + "Manifest entry in {} is missing data_file", manifest.manifest_path); + switch (entry.status) { + case ManifestStatus::kAdded: + ++added_files; + added_rows += entry.data_file->record_count; + break; + case ManifestStatus::kExisting: + ++existing_files; + existing_rows += entry.data_file->record_count; + break; + case ManifestStatus::kDeleted: + ++deleted_files; + deleted_rows += entry.data_file->record_count; + break; + } + } + + manifest_with_counts.added_files_count = added_files; + manifest_with_counts.added_rows_count = added_rows; + manifest_with_counts.existing_files_count = existing_files; + manifest_with_counts.existing_rows_count = existing_rows; + manifest_with_counts.deleted_files_count = deleted_files; + manifest_with_counts.deleted_rows_count = deleted_rows; + return manifest_with_counts; +} + +Result RewriteManifests::CopyManifest(const ManifestFile& manifest) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema()); + ICEBERG_ASSIGN_OR_RAISE(auto spec, + base().PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, ctx_->table->io(), schema, spec, + /*is_committed=*/false)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries()); + + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(base().format_version, SnapshotId(), ManifestPath(), + ctx_->table->io(), std::move(spec), std::move(schema), + manifest.content, manifest.first_row_id)); + for (const auto& entry : entries) { + // A rewritten added manifest may only contain existing entries. + if (entry.status == ManifestStatus::kAdded) { + return ValidationFailed("Cannot add manifest with added files"); + } + if (entry.status == ManifestStatus::kDeleted) { + return ValidationFailed("Cannot add manifest with deleted files"); + } + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); +} + +Result> RewriteManifests::Rewrite( + const TableMetadata& metadata_to_update, + std::span current_manifests) { + ResetRewriteState(); + + using WriterKey = std::pair; + struct WriterKeyHash { + size_t operator()(const WriterKey& key) const { + size_t seed = std::hash{}(key.first); + seed ^= std::hash{}(key.second) + 0x9e3779b9 + (seed << 6) + (seed >> 2); + return seed; + } + }; + + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_to_update.Schema()); + std::unordered_map, WriterKeyHash> + writers; + + for (const auto& manifest : current_manifests) { + if (deleted_manifest_paths_.contains(manifest.manifest_path)) { + continue; + } + if (manifest.content == ManifestContent::kDeletes || !MatchesPredicate(manifest)) { + kept_manifests_.push_back(manifest); + continue; + } + + rewritten_manifests_.push_back(manifest); + rewritten_manifest_paths_.insert(manifest.manifest_path); + ICEBERG_ASSIGN_OR_RAISE( + auto spec, metadata_to_update.PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE( + auto reader, ManifestReader::Make(manifest, ctx_->table->io(), schema, spec)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + for (const auto& entry : entries) { + ICEBERG_PRECHECK(entry.data_file != nullptr, + "Manifest entry in {} is missing data_file", + manifest.manifest_path); + auto key = + WriterKey{cluster_by_func_(*entry.data_file), manifest.partition_spec_id}; + ICEBERG_PRECHECK(!key.first.empty(), "Cluster key cannot be empty"); + + auto writer_it = writers.find(key); + if (writer_it == writers.end()) { + auto writer_spec = spec; + auto writer_schema = schema; + auto [inserted_it, _] = writers.emplace( + key, std::make_unique( + [this, writer_spec, writer_schema, content = manifest.content]() + -> Result> { + return ManifestWriter::MakeWriter( + base().format_version, SnapshotId(), ManifestPath(), + ctx_->table->io(), writer_spec, writer_schema, content); + }, + target_manifest_size_bytes())); + writer_it = inserted_it; + } + + ICEBERG_RETURN_UNEXPECTED(writer_it->second->WriteExistingEntry(entry)); + ++entry_count_; + } + } + + std::vector result; + for (auto& [_, writer] : writers) { + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, writer->ToManifestFiles()); + result.insert(result.end(), std::make_move_iterator(manifests.begin()), + std::make_move_iterator(manifests.end())); + } + return result; +} + +Status RewriteManifests::DeleteUncommitted( + std::vector& manifests, + const std::unordered_set& committed, bool clear) { + for (const auto& manifest : manifests) { + if (!committed.contains(manifest.manifest_path)) { + std::ignore = DeleteFile(manifest.manifest_path); + } + } + if (clear) { + manifests.clear(); + } + return {}; +} + +void RewriteManifests::ResetRewriteState() { + std::ignore = DeleteUncommitted(new_manifests_, {}, /*clear=*/true); + entry_count_ = 0; + kept_manifests_.clear(); + rewritten_manifests_.clear(); + rewritten_manifest_paths_.clear(); +} + +} // namespace iceberg diff --git a/src/iceberg/update/rewrite_manifests.h b/src/iceberg/update/rewrite_manifests.h new file mode 100644 index 000000000..f4f9dfa79 --- /dev/null +++ b/src/iceberg/update/rewrite_manifests.h @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/rewrite_manifests.h + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/snapshot_update.h" + +namespace iceberg { + +/// \brief API for rewriting manifests for a table. +/// +/// This API accumulates manifest files, produces a new snapshot of the table +/// described only by the manifest files that were added, and commits that snapshot +/// as the current. +/// +/// This API can be used to rewrite matching manifests according to a clustering +/// function as well as to replace specific manifests. Manifests that are deleted +/// or added directly are ignored during the rewrite process. The set of active +/// files in replaced manifests must be the same as in new manifests. +/// +/// When committing, these changes will be applied to the latest table snapshot. +/// Commit conflicts will be resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. +class ICEBERG_EXPORT RewriteManifests : public SnapshotUpdate { + public: + using ClusterByFunc = std::function; + using RewritePredicate = std::function; + + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + /// \brief Group an existing data file by a cluster key. + /// + /// The cluster key determines which data file will be associated with a + /// particular manifest. All data files with the same cluster key will be written + /// to the same manifest unless the manifest is large and split into multiple + /// files. Manifests deleted via DeleteManifest or added via AddManifest are + /// ignored during the rewrite process. + RewriteManifests& ClusterBy(ClusterByFunc func); + + /// \brief Determine which existing manifest files should be rewritten. + /// + /// Manifests that do not match the predicate are kept as-is. If this is not + /// called and no predicate is set, all manifests will be rewritten. + RewriteManifests& RewriteIf(RewritePredicate predicate); + + /// \brief Delete a manifest file from the table. + RewriteManifests& DeleteManifest(const ManifestFile& manifest); + + /// \brief Add a manifest file to the table. + /// + /// The added manifest cannot contain new or deleted files. + /// + /// By default, the manifest will be rewritten to ensure all entries have + /// explicit snapshot IDs. In that case, it is always the responsibility of the + /// caller to manage the lifecycle of the original manifest. + /// + /// If manifest entries are allowed to inherit the snapshot ID assigned on + /// commit, the manifest should never be deleted manually if the commit succeeds + /// as it will become part of the table metadata and will be cleaned up on + /// expiry. If the manifest gets merged with others while preparing a new + /// snapshot, it will be deleted automatically if this operation is successful. + /// If the commit fails, the manifest will never be deleted and it is up to the + /// caller whether to delete or reuse it. + RewriteManifests& AddManifest(const ManifestFile& manifest); + + std::string operation() override; + + Result> Apply( + const TableMetadata& metadata_to_update, + const std::shared_ptr& snapshot) override; + std::unordered_map Summary() override; + void SetSummaryProperty(const std::string& property, const std::string& value) override; + Status CleanUncommitted(const std::unordered_set& committed) override; + Status Finalize(Result commit_result) override; + + private: + explicit RewriteManifests(std::string table_name, + std::shared_ptr ctx); + + bool RequiresRewrite( + const std::unordered_set& current_manifest_paths) const; + bool MatchesPredicate(const ManifestFile& manifest) const; + Status ValidateDeletedManifests( + const std::unordered_set& current_manifest_paths, + int64_t current_snapshot_id) const; + Status ValidateAddedManifests() const; + Status ValidateActiveFiles() const; + Result> Entries(const ManifestFile& manifest) const; + Result> ActiveFiles( + const std::vector& manifests) const; + Result FillMissingCounts(const ManifestFile& manifest) const; + + Result CopyManifest(const ManifestFile& manifest); + Result> Rewrite( + const TableMetadata& metadata_to_update, + std::span current_manifests); + + Status DeleteUncommitted(std::vector& manifests, + const std::unordered_set& committed, bool clear); + void ResetRewriteState(); + + private: + std::string table_name_; + ClusterByFunc cluster_by_func_; + RewritePredicate predicate_; + + std::vector deleted_manifests_; + std::unordered_set deleted_manifest_paths_; + std::vector added_manifests_; + std::vector rewritten_added_manifests_; + + std::vector kept_manifests_; + std::vector new_manifests_; + std::vector rewritten_manifests_; + std::unordered_set rewritten_manifest_paths_; + int64_t entry_count_{0}; + + std::unordered_map custom_summary_properties_; + SnapshotSummaryBuilder manifest_count_summary_; + bool cleanup_all_{false}; +}; + +} // namespace iceberg