diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 9a0dc68b7..39a4b66c1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -104,6 +104,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/fast_append.cc update/merge_append.cc + update/replace_partitions.cc update/merging_snapshot_update.cc update/pending_update.cc update/row_delta.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index ab514be87..47aa41074 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -131,6 +131,7 @@ iceberg_sources = files( 'update/merge_append.cc', 'update/merging_snapshot_update.cc', 'update/pending_update.cc', + 'update/replace_partitions.cc', 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h index f3e7ffb85..73768f2f6 100644 --- a/src/iceberg/snapshot.h +++ b/src/iceberg/snapshot.h @@ -251,6 +251,8 @@ struct ICEBERG_EXPORT SnapshotSummaryFields { inline static const std::string kEngineName = "engine-name"; /// \brief Version of the engine that created the snapshot inline static const std::string kEngineVersion = "engine-version"; + /// \brief Whether this is a replace-partitions operation + inline static const std::string kReplacePartitions = "replace-partitions"; }; /// \brief Helper class for building snapshot summaries. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index f29bc4a1a..0fcb42dd4 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -241,6 +241,7 @@ class TransactionContext; class DeleteFiles; class ExpireSnapshots; class FastAppend; +class ReplacePartitions; class MergeAppend; class PendingUpdate; class RowDelta; diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index fc3987ee1..0f447c5e5 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -296,6 +296,14 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { const std::shared_ptr& parent, std::shared_ptr io) const; + /// \brief Record a caller-supplied summary entry that survives commit retry. + /// + /// MergingSnapshotUpdate clears summary_ at the start of every Apply() and + /// rebuilds it from the cached sub-builders, so subclasses must route any + /// custom property through this hook rather than calling summary_.Set() + /// directly. Stored entries are re-merged in Summary(). + void SetSummaryProperty(const std::string& property, const std::string& value) override; + private: struct PendingDeleteFile { std::shared_ptr file; @@ -334,8 +342,6 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { Status ManagersReady() const; - void SetSummaryProperty(const std::string& property, const std::string& value) override; - Result> MergeDVs() const; /// \brief Write new data manifests for staged data files; caches the result. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4f594a06e..e00def71b 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -23,6 +23,7 @@ install_headers( 'merge_append.h', 'merging_snapshot_update.h', 'pending_update.h', + 'replace_partitions.h', 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/replace_partitions.cc b/src/iceberg/update/replace_partitions.cc new file mode 100644 index 000000000..d47b21a92 --- /dev/null +++ b/src/iceberg/update/replace_partitions.cc @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/replace_partitions.h" + +#include "iceberg/expression/expressions.h" +#include "iceberg/partition_spec.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" // IWYU pragma: keep +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ReplacePartitions::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create ReplacePartitions without a context"); + return std::unique_ptr( + new ReplacePartitions(std::move(table_name), std::move(ctx))); +} + +ReplacePartitions::ReplacePartitions(std::string table_name, + std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) { + SetSummaryProperty(SnapshotSummaryFields::kReplacePartitions, "true"); +} + +ReplacePartitions& ReplacePartitions::AddFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(), + "Data file must have partition spec ID"); + + int32_t spec_id = file->partition_spec_id.value(); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, base().PartitionSpecById(spec_id)); + + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); + if (spec->fields().empty()) { + // Unpartitioned spec: Java's BaseReplacePartitions treats this as a + // table-wide replace rather than a spec-scoped DropPartition with empty + // partition values. Mirror that so every existing data file is dropped + // and conflict validation runs against AlwaysTrue. + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(Expressions::AlwaysTrue())); + replace_by_row_filter_ = true; + } else { + ICEBERG_BUILDER_RETURN_IF_ERROR(DropPartition(spec_id, file->partition)); + replaced_partitions_.add(spec_id, file->partition); + } + return *this; +} + +ReplacePartitions& ReplacePartitions::ValidateAppendOnly() { + FailAnyDelete(); + return *this; +} + +ReplacePartitions& ReplacePartitions::ValidateFromSnapshot(int64_t snapshot_id) { + starting_snapshot_id_ = snapshot_id; + return *this; +} + +ReplacePartitions& ReplacePartitions::ValidateNoConflictingData() { + validate_conflicting_data_ = true; + return *this; +} + +ReplacePartitions& ReplacePartitions::ValidateNoConflictingDeletes() { + validate_conflicting_deletes_ = true; + return *this; +} + +std::string ReplacePartitions::operation() { return DataOperation::kOverwrite; } + +Status ReplacePartitions::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + // Match Java BaseReplacePartitions: require at least one staged data file. + // Use the flags AddFile() sets — `DataSpec()` would also error on multi-spec + // stages and is not the guard we want here. + if (!replace_by_row_filter_ && replaced_partitions_.empty()) { + return InvalidArgument( + "ReplacePartitions requires at least one data file; call AddFile() first"); + } + + if (snapshot == nullptr) { + return {}; + } + + auto io = ctx_->table->io(); + if (validate_conflicting_data_) { + if (replace_by_row_filter_) { + ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, Expressions::AlwaysTrue(), snapshot, + io, IsCaseSensitive())); + } else { + ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, replaced_partitions_, snapshot, io)); + } + } + if (validate_conflicting_deletes_) { + // Java's BaseReplacePartitions.validate gates both ValidateNoNewDeleteFiles + // and ValidateDeletedDataFiles on the same validateNewDeletes flag. The + // second check rejects concurrent overwrite/delete commits in the replaced + // partitions; without it a concurrent delete in a replaced partition would + // commit silently. + if (replace_by_row_filter_) { + ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeleteFiles( + current_metadata, starting_snapshot_id_, Expressions::AlwaysTrue(), snapshot, + io, IsCaseSensitive())); + ICEBERG_RETURN_UNEXPECTED(ValidateDeletedDataFiles( + current_metadata, starting_snapshot_id_, Expressions::AlwaysTrue(), snapshot, + io, IsCaseSensitive())); + } else { + ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeleteFiles( + current_metadata, starting_snapshot_id_, replaced_partitions_, snapshot, io)); + ICEBERG_RETURN_UNEXPECTED(ValidateDeletedDataFiles( + current_metadata, starting_snapshot_id_, replaced_partitions_, snapshot, io)); + } + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/replace_partitions.h b/src/iceberg/update/replace_partitions.h new file mode 100644 index 000000000..70515054b --- /dev/null +++ b/src/iceberg/update/replace_partitions.h @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/replace_partitions.h + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +/// \brief Replaces partitions in a table with new data files. +/// +/// ReplacePartitions dynamically identifies which partitions to overwrite based +/// on the data files added via AddFile(). All existing data files in each +/// touched partition are marked DELETED, and the new files are written as the +/// sole data in those partitions. Partitions not referenced by any added file +/// are left unchanged. +/// +/// This operation produces a snapshot with operation="overwrite" and +/// "replace-partitions"="true" in the summary. For unpartitioned tables, all +/// existing files are replaced. +/// +/// When committing, these changes are applied to the latest table snapshot. +/// Commit conflicts are resolved by re-applying to the new latest snapshot +/// and reattempting the commit. +/// +/// \note This is provided to implement SQL compatible with Hive table +/// operations but is not recommended. Instead, use OverwriteFiles to +/// explicitly overwrite data. +class ICEBERG_EXPORT ReplacePartitions : public MergingSnapshotUpdate { + public: + /// \brief Create a new ReplacePartitions instance. + /// + /// \param table_name The name of the table + /// \param ctx The transaction context + /// \return A Result containing the ReplacePartitions instance or an error + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + /// \brief Add a data file and mark its partition for replacement. + /// + /// Each call registers the file's partition so all existing data files in + /// that partition are replaced. Duplicate files (same path) are ignored. + /// + /// \param file The data file to add (must have partition_spec_id set) + /// \return Reference to this for method chaining + ReplacePartitions& AddFile(const std::shared_ptr& file); + + /// \brief Fail the commit if any existing data file would be deleted. + /// + /// This validation is useful to ensure the operation is only applied to + /// tables where no data currently exists in the affected partitions. + /// + /// \return Reference to this for method chaining + ReplacePartitions& ValidateAppendOnly(); + + /// \brief Set the snapshot ID used as the baseline for conflict validation. + /// + /// Validations check changes that occurred after this snapshot ID. If not + /// set, all ancestor snapshots through the initial snapshot are validated. + /// + /// \param snapshot_id A snapshot ID + /// \return Reference to this for method chaining + ReplacePartitions& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Enable validation that no conflicting data files were added concurrently. + /// + /// Fails the commit if a concurrent operation added a data file in any of + /// the partitions being replaced after the snapshot set by + /// ValidateFromSnapshot(). + /// + /// \return Reference to this for method chaining + ReplacePartitions& ValidateNoConflictingData(); + + /// \brief Enable validation that no conflicting delete files were added concurrently. + /// + /// Fails the commit if a concurrent operation added a delete file covering + /// any of the partitions being replaced after the snapshot set by + /// ValidateFromSnapshot(). + /// + /// \return Reference to this for method chaining + ReplacePartitions& ValidateNoConflictingDeletes(); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit ReplacePartitions(std::string table_name, + std::shared_ptr ctx); + + std::optional starting_snapshot_id_; + bool validate_conflicting_data_{false}; + bool validate_conflicting_deletes_{false}; + // True once an AddFile() call has staged a file whose partition spec is + // unpartitioned. Java's BaseReplacePartitions treats this case as a + // table-wide replace (DeleteByRowFilter(AlwaysTrue())) and runs conflict + // validation against AlwaysTrue rather than a partition set — mirror that. + bool replace_by_row_filter_{false}; + // Partitions touched by AddFile() in partitioned specs. Used to scope + // conflict validation to the overwritten partitions in the partitioned + // case; ignored when replace_by_row_filter_ is true. + PartitionSet replaced_partitions_; +}; + +} // namespace iceberg