diff --git a/.gitignore b/.gitignore index 458f876f7..6fb9e41ed 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,9 @@ # under the License. build/ +build_bundle/ +build_core/ +builddir/ cmake-build/ cmake-build-debug/ cmake-build-release/ diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 1bf1f10db..b62d1f1f3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -37,6 +37,7 @@ set(ICEBERG_SOURCES expression/projections.cc expression/residual_evaluator.cc expression/rewrite_not.cc + expression/sanitize_expression.cc expression/strict_metrics_evaluator.cc expression/term.cc file_io.cc diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index 6148ef4e6..a079f4e10 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -23,6 +23,7 @@ #include #include "iceberg/file_io.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -352,7 +353,14 @@ InMemoryCatalog::InMemoryCatalog( properties_(std::move(properties)), file_io_(std::move(file_io)), warehouse_location_(std::move(warehouse_location)), - root_namespace_(std::make_unique()) {} + root_namespace_(std::make_unique()) { + auto it = properties_.find(std::string(kMetricsReporterImpl)); + if (it != properties_.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + ICEBERG_ASSIGN_OR_THROW(auto reporter, MetricsReporters::Load(properties_)); + reporter_ = std::shared_ptr(std::move(reporter)); + } +} InMemoryCatalog::~InMemoryCatalog() = default; @@ -429,7 +437,8 @@ Result> InMemoryCatalog::CreateTable( ICEBERG_RETURN_UNEXPECTED( root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location)); return Table::Make(identifier, std::move(table_metadata), - std::move(metadata_file_location), file_io_, shared_from_this()); + std::move(metadata_file_location), file_io_, shared_from_this(), + reporter_); } Result> InMemoryCatalog::UpdateTable( @@ -480,7 +489,7 @@ Result> InMemoryCatalog::UpdateTable( TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated); return Table::Make(identifier, std::move(updated), std::move(new_metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::StageCreateTable( @@ -501,7 +510,7 @@ Result> InMemoryCatalog::StageCreateTable( TableMetadata::Make(*schema, *spec, *order, base_location, properties)); ICEBERG_ASSIGN_OR_RAISE( auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(table), TransactionKind::kCreate); } @@ -581,7 +590,7 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); return Table::Make(identifier, std::move(metadata), std::move(metadata_location), - file_io_, shared_from_this()); + file_io_, shared_from_this(), reporter_); } Result> InMemoryCatalog::RegisterTable( @@ -601,7 +610,7 @@ Result> InMemoryCatalog::RegisterTable( return UnknownError("The registry failed."); } return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index 22a596c10..c3ed3aecc 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -19,6 +19,7 @@ #pragma once +#include #include #include "iceberg/catalog.h" @@ -106,6 +107,7 @@ class ICEBERG_EXPORT InMemoryCatalog std::string warehouse_location_; std::unique_ptr root_namespace_; mutable std::shared_mutex mutex_; + std::shared_ptr reporter_; }; } // namespace iceberg diff --git a/src/iceberg/catalog/rest/CMakeLists.txt b/src/iceberg/catalog/rest/CMakeLists.txt index b6438486a..f64860ff4 100644 --- a/src/iceberg/catalog/rest/CMakeLists.txt +++ b/src/iceberg/catalog/rest/CMakeLists.txt @@ -33,6 +33,7 @@ set(ICEBERG_REST_SOURCES resource_paths.cc rest_catalog.cc rest_file_io.cc + rest_metrics_reporter.cc rest_util.cc types.cc) diff --git a/src/iceberg/catalog/rest/catalog_properties.h b/src/iceberg/catalog/rest/catalog_properties.h index 0515926c7..4711d2d13 100644 --- a/src/iceberg/catalog/rest/catalog_properties.h +++ b/src/iceberg/catalog/rest/catalog_properties.h @@ -55,6 +55,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties inline static Entry kNamespaceSeparator{"namespace-separator", "%1F"}; /// \brief The snapshot loading mode (ALL or REFS). inline static Entry kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"}; + /// \brief Whether to report metrics to the REST catalog server (default: true). + /// + /// When true and the server advertises the ReportMetrics endpoint, RestCatalog + /// automatically POSTs scan and commit reports to the per-table metrics endpoint. + inline static Entry kMetricsReportingEnabled{ + "rest-metrics-reporting-enabled", "true"}; /// \brief The prefix for HTTP headers. inline static constexpr std::string_view kHeaderPrefix = "header."; diff --git a/src/iceberg/catalog/rest/meson.build b/src/iceberg/catalog/rest/meson.build index 48254614f..65a67ffb8 100644 --- a/src/iceberg/catalog/rest/meson.build +++ b/src/iceberg/catalog/rest/meson.build @@ -30,6 +30,7 @@ iceberg_rest_sources = files( 'resource_paths.cc', 'rest_catalog.cc', 'rest_file_io.cc', + 'rest_metrics_reporter.cc', 'rest_util.cc', 'types.cc', ) diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 4cb4fd349..080fa3eb8 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -39,9 +39,11 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/resource_paths.h" #include "iceberg/catalog/rest/rest_file_io.h" +#include "iceberg/catalog/rest/rest_metrics_reporter.h" #include "iceberg/catalog/rest/rest_util.h" #include "iceberg/catalog/rest/types.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -52,6 +54,7 @@ #include "iceberg/table_update.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" +#include "iceberg/util/string_util.h" namespace iceberg::rest { @@ -451,7 +454,7 @@ Result> RestCatalog::Make( // Get snapshot loading mode ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode()); - auto client = std::make_unique(final_config.ExtractHeaders()); + auto client = std::make_shared(final_config.ExtractHeaders()); ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); @@ -459,14 +462,23 @@ Result> RestCatalog::Make( ICEBERG_ASSIGN_OR_RAISE(auto file_io, MakeCatalogFileIO(final_config)); auto default_context = SessionContext::Empty(); - return std::shared_ptr(new RestCatalog( + auto catalog = std::shared_ptr(new RestCatalog( std::move(final_config), std::move(file_io), std::move(client), std::move(paths), std::move(endpoints), std::move(auth_manager), std::move(catalog_session), snapshot_mode, std::move(default_context))); + const auto& props = final_config.configs(); + if (auto it = props.find(std::string(kMetricsReporterImpl)); + it != props.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + ICEBERG_ASSIGN_OR_RAISE(auto reporter, MetricsReporters::Load(props)); + catalog->reporter_ = std::shared_ptr(std::move(reporter)); + } + + return catalog; } RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, - std::unique_ptr client, + std::shared_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, @@ -483,6 +495,14 @@ RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr f snapshot_mode_(snapshot_mode), default_context_(std::move(default_context)) { ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null"); + const auto& props = config_.configs(); + auto it = props.find(std::string(kMetricsReporterImpl)); + if (it != props.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + if (auto r = MetricsReporters::Load(props); r.has_value()) { + reporter_ = std::shared_ptr(std::move(r.value())); + } + } } std::string_view RestCatalog::name() const { return name_; } @@ -524,6 +544,21 @@ Result> RestCatalog::TableFileIO( return file_io_; } +std::shared_ptr RestCatalog::MakeTableReporter( + const TableIdentifier& identifier) const { + auto enabled = config_.Get(RestCatalogProperties::kMetricsReportingEnabled); + if (StringUtils::ToLower(enabled) == "true" && + supported_endpoints_.contains(Endpoint::ReportMetrics())) { + auto path = paths_->Metrics(identifier); + if (path.has_value()) { + auto rest_reporter = + std::make_shared(client_, *path, catalog_session_); + return MetricsReporters::Combine(reporter_, rest_reporter); + } + } + return reporter_; +} + Result> RestCatalog::ListNamespaces( const Namespace& ns, auth::AuthSession& session) const { ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces()); @@ -764,7 +799,7 @@ Result> RestCatalog::StageCreateTable( auto staged_table, StagedTable::Make(identifier, std::move(result.metadata), std::move(result.metadata_location), std::move(table_io), - std::move(table_catalog))); + std::move(table_catalog), MakeTableReporter(identifier))); return Transaction::Make(std::move(staged_table), TransactionKind::kCreate); } @@ -875,7 +910,7 @@ Result> RestCatalog::MakeTableFromLoadResult( shared_from_this(), context, identifier, table_config, table_session); return Table::Make(identifier, std::move(result.metadata), std::move(result.metadata_location), std::move(table_io), - std::move(table_catalog)); + std::move(table_catalog), MakeTableReporter(identifier)); } Result> RestCatalog::MakeTableFromCommitResponse( @@ -891,7 +926,7 @@ Result> RestCatalog::MakeTableFromCommitResponse( shared_from_this(), context, identifier, table_config, table_session); return Table::Make(identifier, std::move(response.metadata), std::move(response.metadata_location), std::move(table_io), - std::move(table_catalog)); + std::move(table_catalog), MakeTableReporter(identifier)); } } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 76d2e54dc..0cc13a534 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -31,6 +31,7 @@ #include "iceberg/catalog/session_catalog.h" #include "iceberg/catalog/session_context.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" /// \file iceberg/catalog/rest/rest_catalog.h /// RestCatalog implementation for Iceberg REST API. @@ -63,7 +64,7 @@ class ICEBERG_REST_EXPORT RestCatalog final class TableScopedCatalog; RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, - std::unique_ptr client, std::unique_ptr paths, + std::shared_ptr client, std::unique_ptr paths, std::unordered_set endpoints, std::unique_ptr auth_manager, std::shared_ptr catalog_session, @@ -147,6 +148,15 @@ class ICEBERG_REST_EXPORT RestCatalog final Result LoadTableInternal(const TableIdentifier& identifier, auth::AuthSession& session) const; + /// \brief Build the per-table metrics reporter. + /// + /// When rest-metrics-reporting-enabled is true and the server advertises the + /// ReportMetrics endpoint, returns a CompositeMetricsReporter combining configured + /// reporter with a RestMetricsReporter targeting this table. Otherwise returns the + /// configured reporter. + std::shared_ptr MakeTableReporter( + const TableIdentifier& identifier) const; + Result CreateTableInternal( const TableIdentifier& identifier, const std::shared_ptr& schema, const std::shared_ptr& spec, const std::shared_ptr& order, @@ -173,7 +183,7 @@ class ICEBERG_REST_EXPORT RestCatalog final RestCatalogProperties config_; std::shared_ptr file_io_; - std::unique_ptr client_; + std::shared_ptr client_; std::unique_ptr paths_; std::string name_; std::unordered_set supported_endpoints_; @@ -182,6 +192,7 @@ class ICEBERG_REST_EXPORT RestCatalog final SnapshotMode snapshot_mode_; SessionContext default_context_; std::weak_ptr default_catalog_; + std::shared_ptr reporter_; }; } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_metrics_reporter.cc b/src/iceberg/catalog/rest/rest_metrics_reporter.cc new file mode 100644 index 000000000..bd70c823c --- /dev/null +++ b/src/iceberg/catalog/rest/rest_metrics_reporter.cc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_metrics_reporter.h" + +#include +#include + +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/error_handlers.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/metrics/json_serde_internal.h" +#include "iceberg/metrics/metrics_reporter.h" + +namespace iceberg::rest { + +namespace { + +constexpr std::string_view kReportType = "report-type"; +constexpr std::string_view kScanReportType = "scan-report"; +constexpr std::string_view kCommitReportType = "commit-report"; + +} // namespace + +RestMetricsReporter::RestMetricsReporter(std::shared_ptr client, + std::string metrics_endpoint, + std::shared_ptr session) + : client_(std::move(client)), + metrics_endpoint_(std::move(metrics_endpoint)), + session_(std::move(session)) {} + +Status RestMetricsReporter::Report(const MetricsReport& report) { + // Serialize the report variant to JSON. + Result json_result = std::visit( + [](const auto& r) -> Result { return ToJson(r); }, report); + if (!json_result) { + return {}; + } + + // Inject "report-type" required by the REST spec (not included in core ToJson). + auto& json = json_result.value(); + json[kReportType] = + std::holds_alternative(report) ? kScanReportType : kCommitReportType; + + // POST to the metrics endpoint; suppress errors to match Java fire-and-forget behavior. + std::ignore = client_->Post(metrics_endpoint_, json.dump(), /*headers=*/{}, + *DefaultErrorHandler::Instance(), *session_); + return {}; +} + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_metrics_reporter.h b/src/iceberg/catalog/rest/rest_metrics_reporter.h new file mode 100644 index 000000000..9efe46720 --- /dev/null +++ b/src/iceberg/catalog/rest/rest_metrics_reporter.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/type_fwd.h" +#include "iceberg/metrics/metrics_reporter.h" + +/// \file iceberg/catalog/rest/rest_metrics_reporter.h +/// \brief MetricsReporter that POSTs reports to the Iceberg REST metrics endpoint. + +namespace iceberg::rest { + +/// \brief Reports scan and commit metrics to the Iceberg REST catalog metrics endpoint. +/// +/// This is the default metrics reporter wired automatically by RestCatalog for each +/// table, mirroring Java's RESTMetricsReporter. It POSTs the serialized report to +/// POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/metrics. +/// This C++ implementation calls HttpClient::Post() synchronously. +/// A future improvement would be to introduce a thread pool. +/// +/// This implementation uses the catalog-level AuthSession. Per-table auth is a future +/// improvement consistent with the existing FIXME for per-table FileIO in +/// RestCatalog::LoadTable. +class ICEBERG_REST_EXPORT RestMetricsReporter : public MetricsReporter { + public: + /// \param client Shared ownership of the HTTP client; must not be null. + /// \param metrics_endpoint Pre-built path from ResourcePaths::Metrics(). + /// \param session Auth session used to authenticate the POST request. + RestMetricsReporter(std::shared_ptr client, std::string metrics_endpoint, + std::shared_ptr session); + + /// \brief POST the report to the metrics endpoint, suppressing all errors. + Status Report(const MetricsReport& report) override; + + private: + std::shared_ptr client_; + std::string metrics_endpoint_; + std::shared_ptr session_; +}; + +} // namespace iceberg::rest diff --git a/src/iceberg/catalog/sql/sql_catalog.cc b/src/iceberg/catalog/sql/sql_catalog.cc index cfe155f76..bf1bb95c6 100644 --- a/src/iceberg/catalog/sql/sql_catalog.cc +++ b/src/iceberg/catalog/sql/sql_catalog.cc @@ -25,6 +25,7 @@ #include "iceberg/catalog/sql/config.h" #include "iceberg/file_io.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -147,6 +148,15 @@ Result> SqlCatalog::Make( auto catalog = std::shared_ptr( new SqlCatalog(config, std::move(file_io), std::move(store))); ICEBERG_RETURN_UNEXPECTED(catalog->store_->Initialize()); + + const auto& props = catalog->config_.props; + if (auto it = props.find(std::string(kMetricsReporterImpl)); + it != props.end() && !it->second.empty() && + it->second != kMetricsReporterTypeNoop) { + ICEBERG_ASSIGN_OR_RAISE(auto reporter, MetricsReporters::Load(props)); + catalog->reporter_ = std::shared_ptr(std::move(reporter)); + } + return catalog; } @@ -372,7 +382,7 @@ Result> SqlCatalog::LoadTableFrom( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); return Table::Make(identifier, std::move(metadata), metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::LoadTable(const TableIdentifier& identifier) { @@ -410,7 +420,7 @@ Result> SqlCatalog::CreateTable( store_->InsertTable(ns_str, identifier.name, metadata_location)); return Table::Make(identifier, std::move(metadata), metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::UpdateTable( @@ -475,7 +485,7 @@ Result> SqlCatalog::UpdateTable( } return Table::Make(identifier, std::move(updated), new_metadata_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } Result> SqlCatalog::StageCreateTable( @@ -502,7 +512,7 @@ Result> SqlCatalog::StageCreateTable( base_location, properties)); ICEBERG_ASSIGN_OR_RAISE(auto table, StagedTable::Make(identifier, std::move(metadata), "", file_io_, - shared_from_this())); + shared_from_this(), reporter_)); return Transaction::Make(std::move(table), TransactionKind::kCreate); } @@ -582,7 +592,7 @@ Result> SqlCatalog::RegisterTable( store_->InsertTable(ns_str, identifier.name, metadata_file_location)); return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, - shared_from_this()); + shared_from_this(), reporter_); } // -------------------------------------------------------------------------- diff --git a/src/iceberg/catalog/sql/sql_catalog.h b/src/iceberg/catalog/sql/sql_catalog.h index 35ef107b1..ccf03723f 100644 --- a/src/iceberg/catalog/sql/sql_catalog.h +++ b/src/iceberg/catalog/sql/sql_catalog.h @@ -184,6 +184,7 @@ class ICEBERG_SQL_CATALOG_EXPORT SqlCatalog SqlCatalogConfig config_; std::shared_ptr file_io_; std::shared_ptr store_; + std::shared_ptr reporter_; }; } // namespace iceberg::sql diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 9a8be06b8..301d7051e 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -35,6 +35,7 @@ #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/manifest_reader.h" #include "iceberg/metadata_columns.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/util/checked_cast.h" @@ -535,6 +536,11 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor ex executor_ = executor; return *this; } +DeleteFileIndex::Builder& DeleteFileIndex::Builder::ScanMetrics( + class iceberg::ScanMetrics* scan_metrics) { + scan_metrics_ = scan_metrics; + return *this; +} Result> DeleteFileIndex::Builder::LoadDeleteFiles() { // TODO(zehua): Replace with a thread-safe LRU cache. @@ -638,8 +644,10 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { ICEBERG_ASSIGN_OR_RAISE(auto should_match, manifest_evaluator->Evaluate(manifest)); if (!should_match) { + if (scan_metrics_) scan_metrics_->skipped_delete_manifests->Increment(1); return manifest_result; } + if (scan_metrics_) scan_metrics_->scanned_delete_manifests->Increment(1); } // Read manifest entries @@ -675,6 +683,9 @@ Result> DeleteFileIndex::Builder::LoadDeleteFiles() { ContentFileUtil::DropUnselectedStats(*entry.data_file, columns); manifest_result.emplace_back(std::move(entry)); } + // Entries filtered out by min_sequence_number_ are not counted as skipped; + // Java's ScanMetricsUtil only counts manifest-level and partition-evaluator + // skips, not sequence-number-based dedup filtering. } return manifest_result; }); @@ -820,12 +831,30 @@ Result> DeleteFileIndex::Builder::Build() { } } - return std::unique_ptr(new DeleteFileIndex( + auto index = std::unique_ptr(new DeleteFileIndex( global_deletes->empty() ? nullptr : std::move(global_deletes), eq_deletes_by_partition->empty() ? nullptr : std::move(eq_deletes_by_partition), pos_deletes_by_partition->empty() ? nullptr : std::move(pos_deletes_by_partition), pos_deletes_by_path->empty() ? nullptr : std::move(pos_deletes_by_path), dv_by_path->empty() ? nullptr : std::move(dv_by_path))); + + if (scan_metrics_) { + for (const auto& delete_file : index->ReferencedDeleteFiles()) { + scan_metrics_->indexed_delete_files->Increment(1); + + if (delete_file->content == DataFile::Content::kPositionDeletes) { + if (ContentFileUtil::IsDV(*delete_file)) { + scan_metrics_->dvs->Increment(1); + } else { + scan_metrics_->positional_delete_files->Increment(1); + } + } else if (delete_file->content == DataFile::Content::kEqualityDeletes) { + scan_metrics_->equality_delete_files->Increment(1); + } + } + } + + return index; } } // namespace iceberg diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 555114a23..13c2471c3 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -40,6 +40,8 @@ namespace iceberg { +class ScanMetrics; + namespace internal { /// \brief Wrapper for equality delete files that caches converted bounds. @@ -362,6 +364,10 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { /// \param executor Executor to use, or std::nullopt to read manifests serially. /// \return Reference to this for method chaining. Builder& PlanWith(OptionalExecutor executor); + /// \brief Attach scan metrics for counting scanned/skipped delete manifests. + /// + /// Non-owning pointer; the pointed-to ScanMetrics must outlive the Build() call. + Builder& ScanMetrics(class iceberg::ScanMetrics* scan_metrics); /// \brief Build the DeleteFileIndex. Result> Build(); @@ -398,6 +404,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { OptionalExecutor executor_; bool case_sensitive_ = true; bool ignore_residuals_ = false; + class iceberg::ScanMetrics* scan_metrics_ = nullptr; }; } // namespace iceberg diff --git a/src/iceberg/expression/sanitize_expression.cc b/src/iceberg/expression/sanitize_expression.cc new file mode 100644 index 000000000..f0ddbd55c --- /dev/null +++ b/src/iceberg/expression/sanitize_expression.cc @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/expression/sanitize_expression.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/literal.h" +#include "iceberg/expression/predicate.h" +#include "iceberg/expression/term.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" +#include "iceberg/util/bucket_util.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/temporal_util.h" + +namespace iceberg { + +namespace { + +std::string SanitizeDate(int32_t days, int32_t today) { + std::string is_past = today > days ? "ago" : "from-now"; + int32_t diff = std::abs(today - days); + if (diff == 0) { + return "(date-today)"; + } else if (diff < 90) { + return "(date-" + std::to_string(diff) + "-days-" + is_past + ")"; + } + + return "(date)"; +} + +std::string SanitizeTimestamp(int64_t micros, int64_t now) { + constexpr int64_t kMicrosPerHour = 60LL * 60LL * 1'000'000LL; + constexpr int64_t kFiveMinutesInMicros = 5LL * 60LL * 1'000'000LL; + constexpr int64_t kThreeDaysInHours = 3LL * 24LL; + constexpr int64_t kNinetyDaysInHours = 90LL * 24LL; + + std::string is_past = now > micros ? "ago" : "from-now"; + int64_t diff = std::abs(now - micros); + if (diff < kFiveMinutesInMicros) { + return "(timestamp-about-now)"; + } + + int64_t hours = diff / kMicrosPerHour; + if (hours <= kThreeDaysInHours) { + return "(timestamp-" + std::to_string(hours) + "-hours-" + is_past + ")"; + } else if (hours < kNinetyDaysInHours) { + int64_t days = hours / 24; + return "(timestamp-" + std::to_string(days) + "-days-" + is_past + ")"; + } + + return "(timestamp)"; +} + +std::string SanitizeNumber(double value, std::string_view type) { + int32_t num_digits = + value == 0 ? 1 : static_cast(std::log10(std::abs(value))) + 1; + return std::format("({}-digit-{})", num_digits, type); +} + +Result SanitizeSimpleString(std::string_view value) { + ICEBERG_ASSIGN_OR_RAISE(auto hash, + BucketUtils::BucketIndex(Literal::String(std::string(value)), + std::numeric_limits::max())); + return std::format("(hash-{:08x})", hash); +} + +Result SanitizeString(std::string_view value, int64_t now, int32_t today) { + static const std::regex kDate(R"(\d{4}-\d{2}-\d{2})"); + static const std::regex kTime(R"(\d{2}:\d{2}(:\d{2}(.\d{1,9})?)?)"); + static const std::regex kTimestamp( + R"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2}(.\d{1,9})?)?)"); + static const std::regex kTimestampTz( + R"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2}(.\d{1,9})?)?([-+]\d{2}:\d{2}|Z))"); + static const std::regex kTimestampNs( + R"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2}(.\d{7,9})?)?)"); + static const std::regex kTimestampTzNs( + R"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}(:\d{2}(.\d{7,9})?)?([-+]\d{2}:\d{2}|Z))"); + + try { + if (std::regex_match(value.begin(), value.end(), kDate)) { + auto days = TemporalUtils::ParseDay(value); + return days.has_value() ? SanitizeDate(*days, today) : SanitizeSimpleString(value); + } + if (std::regex_match(value.begin(), value.end(), kTimestampNs)) { + auto nanos = TemporalUtils::ParseTimestampNs(value); + return nanos.has_value() + ? SanitizeTimestamp(TemporalUtils::NanosToMicros(*nanos), now) + : SanitizeSimpleString(value); + } + if (std::regex_match(value.begin(), value.end(), kTimestampTzNs)) { + auto nanos = TemporalUtils::ParseTimestampNsWithZone(value); + return nanos.has_value() + ? SanitizeTimestamp(TemporalUtils::NanosToMicros(*nanos), now) + : SanitizeSimpleString(value); + } + if (std::regex_match(value.begin(), value.end(), kTimestamp)) { + auto micros = TemporalUtils::ParseTimestamp(value); + return micros.has_value() ? SanitizeTimestamp(*micros, now) + : SanitizeSimpleString(value); + } + if (std::regex_match(value.begin(), value.end(), kTimestampTz)) { + auto micros = TemporalUtils::ParseTimestampWithZone(value); + return micros.has_value() ? SanitizeTimestamp(*micros, now) + : SanitizeSimpleString(value); + } + if (std::regex_match(value.begin(), value.end(), kTime)) { + return std::string("(time)"); + } + return SanitizeSimpleString(value); + } catch (const std::exception&) { + // Don't throw when parsing failed in sanitizeString default to simple string + // sanitization. + return SanitizeSimpleString(value); + } +} + +Result SanitizePlaceholder(const Literal& literal, int64_t now, + int32_t today) { + if (literal.IsNull()) { + return std::string("(null)"); + } + const auto& value = literal.value(); + switch (literal.type()->type_id()) { + case TypeId::kString: + return SanitizeString(std::get(value), now, today); + case TypeId::kDate: + return SanitizeDate(std::get(value), today); + case TypeId::kTimestamp: + case TypeId::kTimestampTz: + return SanitizeTimestamp(std::get(value), now); + case TypeId::kTimestampNs: + case TypeId::kTimestampTzNs: + return SanitizeTimestamp(TemporalUtils::NanosToMicros(std::get(value)), + now); + case TypeId::kTime: + return std::string("(time)"); + case TypeId::kInt: + return SanitizeNumber(std::get(value), "int"); + case TypeId::kLong: + return SanitizeNumber(static_cast(std::get(value)), "int"); + case TypeId::kFloat: + return SanitizeNumber(std::get(value), "float"); + case TypeId::kDouble: + return SanitizeNumber(std::get(value), "float"); + default: + return SanitizeSimpleString(literal.ToString()); + } +} + +Result SanitizeLiteral(const Literal& literal, int64_t now, int32_t today) { + ICEBERG_ASSIGN_OR_RAISE(auto placeholder, SanitizePlaceholder(literal, now, today)); + return Literal::String(std::move(placeholder)); +} + +// Mirrors Java's ExpressionUtil.unbind(BoundTerm): a transform term (bucket/day/etc.) +// is rebuilt as a transform term over a fresh reference, instead of being collapsed to +// a plain column reference. +Result>> MakeSanitizedTransformTerm( + std::string_view name, const std::shared_ptr& transform) { + ICEBERG_ASSIGN_OR_RAISE(auto named_ref, NamedReference::Make(std::string(name))); + std::shared_ptr shared_ref = std::move(named_ref); + ICEBERG_ASSIGN_OR_RAISE(auto unbound_transform, + UnboundTransform::Make(std::move(shared_ref), transform)); + return std::shared_ptr>(std::move(unbound_transform)); +} + +template +Result> MakeSanitizedPredicateOverTerm( + Expression::Operation op, std::shared_ptr> term, + std::vector values) { + if (values.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto pred, UnboundPredicateImpl::Make(op, term)); + return std::shared_ptr(std::move(pred)); + } + if (values.size() == 1) { + ICEBERG_ASSIGN_OR_RAISE( + auto pred, UnboundPredicateImpl::Make(op, term, std::move(values[0]))); + return std::shared_ptr(std::move(pred)); + } + ICEBERG_ASSIGN_OR_RAISE(auto pred, + UnboundPredicateImpl::Make(op, term, std::move(values))); + return std::shared_ptr(std::move(pred)); +} + +// Rebuilds a sanitized predicate over a bound `term`, preserving whether it was a plain +// column reference or a transform -- only the literal values are replaced with +// placeholders. +Result> MakeSanitizedPredicate( + Expression::Operation op, const std::shared_ptr& term, + std::vector values) { + if (term->kind() == Term::Kind::kTransform) { + const auto& bound_transform = internal::checked_cast(*term); + ICEBERG_ASSIGN_OR_RAISE(auto transform_term, + MakeSanitizedTransformTerm(term->reference()->name(), + bound_transform.transform())); + return MakeSanitizedPredicateOverTerm(op, std::move(transform_term), + std::move(values)); + } + ICEBERG_ASSIGN_OR_RAISE(auto named_ref, + NamedReference::Make(std::string(term->reference()->name()))); + std::shared_ptr> ref_term = std::move(named_ref); + return MakeSanitizedPredicateOverTerm(op, std::move(ref_term), + std::move(values)); +} + +// Rebuilds a sanitized predicate over an unbound `term`. +Result> MakeSanitizedPredicate(Expression::Operation op, + const Term& term, + std::vector values) { + if (term.kind() == Term::Kind::kTransform) { + const auto& unbound_transform = internal::checked_cast(term); + // reference() is non-const on Unbound but never mutates state; same pattern as + // json_serde.cc's ToJson(const UnboundTransform&). + auto& mut = const_cast(unbound_transform); + ICEBERG_ASSIGN_OR_RAISE(auto transform_term, + MakeSanitizedTransformTerm(mut.reference()->name(), + unbound_transform.transform())); + return MakeSanitizedPredicateOverTerm(op, std::move(transform_term), + std::move(values)); + } + const auto& named_reference = internal::checked_cast(term); + ICEBERG_ASSIGN_OR_RAISE(auto named_ref, + NamedReference::Make(std::string(named_reference.name()))); + std::shared_ptr> ref_term = std::move(named_ref); + return MakeSanitizedPredicateOverTerm(op, std::move(ref_term), + std::move(values)); +} + +} // namespace + +SanitizeExpression::SanitizeExpression() { + auto now = std::chrono::system_clock::now(); + now_ = std::chrono::duration_cast(now.time_since_epoch()) + .count(); + today_ = static_cast( + std::chrono::duration_cast(now.time_since_epoch()).count()); +} + +Result> SanitizeExpression::Sanitize( + const std::shared_ptr& expr) { + ICEBERG_DCHECK(expr != nullptr, "Expression cannot be null"); + SanitizeExpression visitor; + return iceberg::Visit, SanitizeExpression>(expr, visitor); +} + +Result> SanitizeExpression::AlwaysTrue() { + return True::Instance(); +} + +Result> SanitizeExpression::AlwaysFalse() { + return False::Instance(); +} + +Result> SanitizeExpression::Not( + const std::shared_ptr& child_result) { + return iceberg::Not::MakeFolded(child_result); +} + +Result> SanitizeExpression::And( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) { + return iceberg::And::MakeFolded(left_result, right_result); +} + +Result> SanitizeExpression::Or( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) { + return iceberg::Or::MakeFolded(left_result, right_result); +} + +Result> SanitizeExpression::Predicate( + const std::shared_ptr& pred) { + switch (pred->kind()) { + case BoundPredicate::Kind::kUnary: + return MakeSanitizedPredicate(pred->op(), pred->term(), {}); + case BoundPredicate::Kind::kLiteral: { + const auto& literal_pred = + internal::checked_cast(*pred); + ICEBERG_ASSIGN_OR_RAISE(auto placeholder, + SanitizeLiteral(literal_pred.literal(), now_, today_)); + return MakeSanitizedPredicate(pred->op(), pred->term(), {std::move(placeholder)}); + } + case BoundPredicate::Kind::kSet: { + const auto& set_pred = internal::checked_cast(*pred); + std::vector placeholders; + placeholders.reserve(set_pred.literal_set().size()); + for (const auto& literal : set_pred.literal_set()) { + ICEBERG_ASSIGN_OR_RAISE(auto placeholder, SanitizeLiteral(literal, now_, today_)); + placeholders.push_back(std::move(placeholder)); + } + return MakeSanitizedPredicate(pred->op(), pred->term(), std::move(placeholders)); + } + } + return InvalidExpression("Unsupported bound predicate kind for sanitization"); +} + +Result> SanitizeExpression::Predicate( + const std::shared_ptr& pred) { + auto literals = pred->literals(); + std::vector placeholders; + placeholders.reserve(literals.size()); + for (const auto& literal : literals) { + ICEBERG_ASSIGN_OR_RAISE(auto placeholder, SanitizeLiteral(literal, now_, today_)); + placeholders.push_back(std::move(placeholder)); + } + return MakeSanitizedPredicate(pred->op(), pred->unbound_term(), + std::move(placeholders)); +} + +Result> SanitizeExpression::Sanitize( + const Schema& schema, const std::shared_ptr& expr, bool case_sensitive) { + auto bound = Binder::Bind(schema, expr, case_sensitive); + if (bound.has_value()) { + return Sanitize(*bound); + } + return Sanitize(expr); +} +// TODO : add StringSanitizer for logging. +} // namespace iceberg diff --git a/src/iceberg/expression/sanitize_expression.h b/src/iceberg/expression/sanitize_expression.h new file mode 100644 index 000000000..d547bc9b9 --- /dev/null +++ b/src/iceberg/expression/sanitize_expression.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/expression/sanitize_expression.h +/// Replace literal values in an expression with type-aware placeholders. + +#include +#include +#include + +#include "iceberg/expression/expression_visitor.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Rewrites an expression tree so that literal values are replaced with +/// type-aware placeholders (e.g. "(2-digit-int)", "(hash-3f9a1c02)", +/// "(date-5-days-ago)"), while preserving the predicate/column/operator structure. +/// +/// Mirrors Java's `org.apache.iceberg.expressions.ExpressionUtil.sanitize`. Used before +/// handing a scan's row filter to a MetricsReporter so that literal predicate values +/// (which may be sensitive, e.g. PII) are never exposed to metrics consumers. +class ICEBERG_EXPORT SanitizeExpression + : public ExpressionVisitor> { + public: + /// \brief Sanitize an expression tree, replacing literals with placeholders. + static Result> Sanitize( + const std::shared_ptr& expr); + + /// \brief Bind `expr` to `schema` first, falling back to sanitizing the unbound + /// expression if binding fails. Mirrors Java's `ExpressionUtil.sanitize(StructType, + /// Expression, boolean)`. + static Result> Sanitize( + const Schema& schema, const std::shared_ptr& expr, bool case_sensitive); + + Result> AlwaysTrue() override; + Result> AlwaysFalse() override; + Result> Not( + const std::shared_ptr& child_result) override; + Result> And( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override; + Result> Or( + const std::shared_ptr& left_result, + const std::shared_ptr& right_result) override; + Result> Predicate( + const std::shared_ptr& pred) override; + Result> Predicate( + const std::shared_ptr& pred) override; + + private: + SanitizeExpression(); + + /// Current time, microseconds since epoch, captured once per Sanitize() call. + int64_t now_; + /// Current day, days since epoch (UTC), captured once per Sanitize() call. + int32_t today_; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index ec5eb66bc..d17b414bd 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -36,6 +36,7 @@ #include "iceberg/expression/residual_evaluator.h" #include "iceberg/file_io.h" #include "iceberg/manifest/manifest_reader.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/partition_spec.h" #include "iceberg/row/manifest_wrapper.h" #include "iceberg/schema.h" @@ -198,6 +199,12 @@ ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) { return *this; } +ManifestGroup& ManifestGroup::ScanMetrics( + std::shared_ptr scan_metrics) { + scan_metrics_ = std::move(scan_metrics); + return *this; +} + Result>> ManifestGroup::PlanFiles() { auto create_file_scan_tasks = [this](std::vector&& entries, @@ -212,6 +219,21 @@ Result>> ManifestGroup::PlanFiles() { ContentFileUtil::DropUnselectedStats(*entry.data_file, ctx.columns_to_keep_stats); } ICEBERG_ASSIGN_OR_RAISE(auto delete_files, ctx.deletes->ForEntry(entry)); + // Mirrors Java's ScanMetricsUtil.fileTask(): counted once per FileScanTask (i.e. + // once per data file), so a delete file shared by N data files contributes here + // N times, unlike indexed_delete_files which is deduplicated in DeleteFileIndex. + if (scan_metrics_) { + scan_metrics_->total_file_size_in_bytes->Increment( + ContentFileUtil::ContentSizeInBytes(*entry.data_file)); + scan_metrics_->result_data_files->Increment(1); + scan_metrics_->result_delete_files->Increment( + static_cast(delete_files.size())); + int64_t deletes_size = 0; + for (const auto& delete_file : delete_files) { + deletes_size += ContentFileUtil::ContentSizeInBytes(*delete_file); + } + scan_metrics_->total_delete_file_size_in_bytes->Increment(deletes_size); + } ICEBERG_ASSIGN_OR_RAISE(auto residual, ctx.residuals->ResidualFor(entry.data_file->partition)); tasks.push_back(std::make_shared( @@ -229,6 +251,7 @@ Result>> ManifestGroup::PlanFiles() { for (auto& task : tasks) { file_tasks.push_back(internal::checked_pointer_cast(task)); } + return file_tasks; } @@ -254,6 +277,9 @@ Result>> ManifestGroup::Plan( return residual_cache[spec_id].get(); }; + if (scan_metrics_) { + delete_index_builder_.ScanMetrics(scan_metrics_.get()); + } ICEBERG_ASSIGN_OR_RAISE(auto delete_index, delete_index_builder_.Build()); bool drop_stats = ManifestReader::ShouldDropStats(columns_); @@ -346,6 +372,10 @@ Result> ManifestGroup::MakeReader( .CaseSensitive(case_sensitive_) .Select(std::move(columns)); + if (scan_metrics_) { + reader->SkipCounter(scan_metrics_->skipped_data_files); + } + return reader; } @@ -408,12 +438,19 @@ ManifestGroup::ReadEntries() { manifest_evaluator->Evaluate(manifest)); if (!should_match) { // Skip this manifest because it doesn't match partition filter + if (scan_metrics_) { + scan_metrics_->skipped_data_manifests->Increment(1); + } return {}; } + if (scan_metrics_) { + scan_metrics_->scanned_data_manifests->Increment(1); + } if (ignore_deleted_) { // only scan manifests that have entries other than deletes if (!manifest.has_added_files() && !manifest.has_existing_files()) { + if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1); return {}; } } @@ -421,6 +458,7 @@ ManifestGroup::ReadEntries() { if (ignore_existing_) { // only scan manifests that have entries other than existing if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + if (scan_metrics_) scan_metrics_->skipped_data_files->Increment(1); return {}; } } diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h index 09ae4a503..0cc07d618 100644 --- a/src/iceberg/manifest/manifest_group.h +++ b/src/iceberg/manifest/manifest_group.h @@ -130,6 +130,9 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { /// \return Reference to this for method chaining. ManifestGroup& PlanWith(OptionalExecutor executor); + /// \brief Attach scan metrics to receive per-manifest and per-file counters. + ManifestGroup& ScanMetrics(std::shared_ptr scan_metrics); + /// \brief Plan scan tasks for all matching data files. Result>> PlanFiles(); @@ -173,6 +176,7 @@ class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { bool ignore_deleted_ = false; bool ignore_existing_ = false; bool ignore_residuals_ = false; + std::shared_ptr scan_metrics_; }; } // namespace iceberg diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 8757b5d61..c57c30a8c 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -772,6 +772,11 @@ ManifestReader& ManifestReaderImpl::TryDropStats() { return *this; } +ManifestReader& ManifestReaderImpl::SkipCounter(std::shared_ptr counter) { + skip_counter_ = std::move(counter); + return *this; +} + bool ManifestReaderImpl::HasPartitionFilter() const { ICEBERG_DCHECK(part_filter_, "Partition filter is not set"); return part_filter_->op() != Expression::Operation::kTrue; @@ -928,6 +933,7 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv ICEBERG_ASSIGN_OR_RAISE(bool partition_match, evaluator->Evaluate(entry.data_file->partition)); if (!partition_match) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } @@ -935,11 +941,13 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv ICEBERG_ASSIGN_OR_RAISE(bool metrics_match, metrics_evaluator->Evaluate(*entry.data_file)); if (!metrics_match) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } ICEBERG_ASSIGN_OR_RAISE(bool in_partition_set, InPartitionSet(*entry.data_file)); if (!in_partition_set) { + if (skip_counter_) skip_counter_->Increment(1); continue; } } diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index b2d1c6505..72cb9ae56 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -30,6 +30,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/counter.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -76,6 +77,9 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Try to drop stats from returned DataFile objects. virtual ManifestReader& TryDropStats() = 0; + /// \brief Set a counter to increment for each entry skipped by per-entry filters. + virtual ManifestReader& SkipCounter(std::shared_ptr counter) = 0; + /// \brief Determine whether stats should be dropped based on selected columns. /// /// Returns true if the selected columns do not include any stats columns, or only diff --git a/src/iceberg/manifest/manifest_reader_internal.h b/src/iceberg/manifest/manifest_reader_internal.h index 53ce2fcb5..15847ecf8 100644 --- a/src/iceberg/manifest/manifest_reader_internal.h +++ b/src/iceberg/manifest/manifest_reader_internal.h @@ -77,6 +77,8 @@ class ManifestReaderImpl : public ManifestReader { ManifestReader& TryDropStats() override; + ManifestReader& SkipCounter(std::shared_ptr counter) override; + private: /// \brief Read entries with optional live-only filtering. Result> ReadEntries(bool only_live); @@ -114,6 +116,7 @@ class ManifestReaderImpl : public ManifestReader { std::shared_ptr part_filter_{True::Instance()}; std::shared_ptr row_filter_{True::Instance()}; std::shared_ptr partition_set_; + std::shared_ptr skip_counter_; bool case_sensitive_{true}; bool drop_stats_{false}; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 71a4498f1..4fa7d2166 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -62,6 +62,7 @@ iceberg_sources = files( 'expression/projections.cc', 'expression/residual_evaluator.cc', 'expression/rewrite_not.cc', + 'expression/sanitize_expression.cc', 'expression/strict_metrics_evaluator.cc', 'expression/term.cc', 'file_io.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 9dbc5acf7..2d4223077 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,10 +19,12 @@ #include "iceberg/table.h" +#include #include #include "iceberg/catalog.h" #include "iceberg/location_provider.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -55,7 +57,8 @@ Result> Table::Make(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, + std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -70,19 +73,20 @@ Result> Table::Make(TableIdentifier identifier, } return std::shared_ptr(new Table(std::move(identifier), std::move(metadata), std::move(metadata_location), std::move(io), - std::move(catalog))); + std::move(catalog), std::move(reporter))); } Table::~Table() = default; Table::Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) + std::shared_ptr catalog, std::shared_ptr reporter) : identifier_(std::move(identifier)), metadata_(std::move(metadata)), metadata_location_(std::move(metadata_location)), io_(std::move(io)), catalog_(std::move(catalog)), + reporter_(std::move(reporter)), metadata_cache_(std::make_unique(metadata_.get())) {} const std::string& Table::uuid() const { return metadata_->table_uuid; } @@ -155,12 +159,44 @@ const std::shared_ptr& Table::metadata() const { return metadata_ const std::shared_ptr& Table::catalog() const { return catalog_; } +std::string Table::FullyQualifiedName() const { + if (!catalog_) { + return identifier_.ToString(); + } + std::string_view catalog_name = catalog_->name(); + std::string result; + if (catalog_name.contains('/') || catalog_name.contains(':')) { + result = catalog_name; + if (!catalog_name.ends_with('/')) { + result += '/'; + } + } else { + result = std::string(catalog_name) + '.'; + } + for (const auto& level : identifier_.ns.levels) { + result += level + '.'; + } + result += identifier_.name; + return result; +} + +const std::shared_ptr& Table::reporter() const { return reporter_; } + +void Table::CombineReporter(std::shared_ptr additional) { + reporter_ = MetricsReporters::Combine(reporter_, std::move(additional)); +} + Result> Table::location_provider() const { return LocationProvider::Make(metadata_->location, metadata_->properties); } Result> Table::NewScan() const { - return DataTableScanBuilder::Make(metadata_, io_); + ICEBERG_ASSIGN_OR_RAISE(auto builder, DataTableScanBuilder::Make(metadata_, io_)); + builder->TableName(FullyQualifiedName()); + if (reporter_) { + builder->MetricsReporter(reporter_); + } + return builder; } Result> Table::NewIncrementalAppendScan() @@ -218,7 +254,11 @@ Result> Table::NewUpdateLocation() { Result> Table::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); - return FastAppend::Make(name().name, std::move(ctx)); + ICEBERG_ASSIGN_OR_RAISE(auto op, FastAppend::Make(name().name, std::move(ctx))); + if (reporter_) { + op->ReportWith(reporter_); + } + return op; } Result> Table::NewMergeAppend() { @@ -264,7 +304,7 @@ Result> Table::NewSnapshotManager() { Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) { + std::shared_ptr catalog, std::shared_ptr reporter) { if (metadata == nullptr) [[unlikely]] { return InvalidArgument("Metadata cannot be null"); } @@ -274,9 +314,9 @@ Result> StagedTable::Make( if (catalog == nullptr) [[unlikely]] { return InvalidArgument("Catalog cannot be null"); } - return std::shared_ptr( - new StagedTable(std::move(identifier), std::move(metadata), - std::move(metadata_location), std::move(io), std::move(catalog))); + return std::shared_ptr(new StagedTable( + std::move(identifier), std::move(metadata), std::move(metadata_location), + std::move(io), std::move(catalog), std::move(reporter))); } StagedTable::~StagedTable() = default; diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 64ed21ef8..06a0f33be 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -26,6 +26,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" @@ -43,17 +44,25 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \param[in] metadata_location The location of the table metadata file. /// \param[in] io The FileIO to read and write table data and metadata files. /// \param[in] catalog The catalog that this table belongs to. - static Result> Make(TableIdentifier identifier, - std::shared_ptr metadata, - std::string metadata_location, - std::shared_ptr io, - std::shared_ptr catalog); + /// \param[in] reporter Optional metrics reporter for this table. Defaults to nullptr + /// (noop). + static Result> Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); virtual ~Table(); /// \brief Returns the identifier of this table const TableIdentifier& name() const { return identifier_; } + /// \brief Returns the fully-qualified name of this table for metrics reporting. + /// + /// Combines the owning catalog's name with the table identifier (e.g. + /// "catalog.namespace.table") + std::string FullyQualifiedName() const; + /// \brief Returns the UUID of the table const std::string& uuid() const; @@ -117,6 +126,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ /// \brief Returns the catalog that this table belongs to const std::shared_ptr& catalog() const; + /// \brief Returns the metrics reporter for this table. + const std::shared_ptr& reporter() const; + + /// \brief Add an additional metrics reporter, combining with any existing one. + /// + /// If a reporter is already set, + /// the new reporter is combined into a CompositeMetricsReporter. + void CombineReporter(std::shared_ptr additional); + /// \brief Returns a LocationProvider for this table Result> location_provider() const; @@ -194,13 +212,15 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); const TableIdentifier identifier_; std::shared_ptr metadata_; std::string metadata_location_; std::shared_ptr io_; std::shared_ptr catalog_; + std::shared_ptr reporter_; std::unique_ptr metadata_cache_; }; @@ -210,7 +230,8 @@ class ICEBERG_EXPORT StagedTable final : public Table { static Result> Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr catalog, + std::shared_ptr reporter = nullptr); ~StagedTable() override; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index fb4fbf3c5..b8a69b26b 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -19,14 +19,19 @@ #include "iceberg/table_scan.h" +#include #include #include #include "iceberg/expression/binder.h" #include "iceberg/expression/expression.h" #include "iceberg/expression/residual_evaluator.h" +#include "iceberg/expression/sanitize_expression.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_group.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/metrics_reporters.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -413,6 +418,21 @@ TableScanBuilder::ResolveSnapshotSchema() { return snapshot_schema_; } +template +TableScanBuilder& TableScanBuilder::MetricsReporter( + std::shared_ptr reporter) { + context_.metrics_reporter = + MetricsReporters::Combine(context_.metrics_reporter, std::move(reporter)); + return *this; +} + +template +TableScanBuilder& TableScanBuilder::TableName( + std::string table_name) { + context_.table_name = std::move(table_name); + return *this; +} + template Result> TableScanBuilder::Build() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); @@ -528,6 +548,10 @@ Result>> DataTableScan::PlanFiles() co return std::vector>{}; } + auto metrics_context = MetricsContext::Default(); + std::shared_ptr scan_metrics = ScanMetrics::Make(*metrics_context); + auto timed = scan_metrics->total_planning_duration->Start(); + TableMetadataCache metadata_cache(metadata_.get()); ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById()); @@ -535,6 +559,11 @@ Result>> DataTableScan::PlanFiles() co ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, snapshot_cache.DataManifests(io_)); ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests, snapshot_cache.DeleteManifests(io_)); + scan_metrics->total_data_manifests->Increment( + static_cast(data_manifests.size())); + scan_metrics->total_delete_manifests->Increment( + static_cast(delete_manifests.size())); + ICEBERG_ASSIGN_OR_RAISE( auto manifest_group, ManifestGroup::Make(io_, schema_, specs_by_id, @@ -545,11 +574,50 @@ Result>> DataTableScan::PlanFiles() co .FilterData(filter()) .IgnoreDeleted() .ColumnsToKeepStats(context_.columns_to_keep_stats) - .PlanWith(context_.plan_executor); + .PlanWith(context_.plan_executor) + .ScanMetrics(scan_metrics); if (context_.ignore_residuals) { manifest_group->IgnoreResiduals(); } - return manifest_group->PlanFiles(); + ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->PlanFiles()); + + timed.Stop(); + + if (context_.metrics_reporter) { + ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, ResolveProjectedSchema()); + const auto& schema_ptr = projected_schema.get(); + + ICEBERG_ASSIGN_OR_RAISE(auto projected_id_set, + GetProjectedIdsVisitor::GetProjectedIds( + *schema_ptr, /*include_struct_ids=*/true)); + std::vector projected_field_ids(projected_id_set.begin(), + projected_id_set.end()); + std::ranges::sort(projected_field_ids); + + std::vector projected_field_names; + projected_field_names.reserve(projected_field_ids.size()); + for (int32_t field_id : projected_field_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto field_name, schema_ptr->FindColumnNameById(field_id)); + projected_field_names.emplace_back(field_name.value_or("")); + } + + ICEBERG_ASSIGN_OR_RAISE(auto sanitized_filter, + SanitizeExpression::Sanitize(filter())); + + ScanReport report{ + .table_name = context_.table_name, + .snapshot_id = snapshot->snapshot_id, + .filter = std::move(sanitized_filter), + .schema_id = schema_ptr->schema_id(), + .projected_field_ids = std::move(projected_field_ids), + .projected_field_names = std::move(projected_field_names), + .scan_metrics = scan_metrics->ToResult(), + .metadata = context_.options, + }; + (void)context_.metrics_reporter->Report(report); + } + + return tasks; } // Friend function template for IncrementalScan that implements the shared PlanFiles diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index 3e4f14d55..c9447eecc 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -28,6 +28,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/table_metadata.h" #include "iceberg/type_fwd.h" @@ -230,6 +231,10 @@ struct TableScanContext { std::string branch{}; std::optional min_rows_requested; OptionalExecutor plan_executor; + /// \brief Fully-qualified table name for metrics reporting. + std::string table_name; + /// \brief Reporter to receive ScanReport after PlanFiles() completes. + std::shared_ptr metrics_reporter; // Validate the context parameters to see if they have conflicts. [[nodiscard]] Status Validate() const; @@ -383,6 +388,15 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector { TableScanBuilder& UseBranch(const std::string& branch) requires IsIncrementalScan; + /// \brief Add a metrics reporter for this scan. + /// + /// May be called multiple times; each call combines with the previous reporter + /// via MetricsReporters::Combine(). Mirrors Java TableScan.metricsReporter(). + TableScanBuilder& MetricsReporter(std::shared_ptr reporter); + + /// \brief Set the table name for metrics reporting. + TableScanBuilder& TableName(std::string table_name); + /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. Result> Build(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index fcbc22126..c78656142 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -222,6 +222,7 @@ if(ICEBERG_BUILD_BUNDLE) file_scan_task_test.cc incremental_append_scan_test.cc incremental_changelog_scan_test.cc + scan_planning_metrics_test.cc table_scan_test.cc) add_iceberg_test(table_update_test @@ -301,6 +302,7 @@ if(ICEBERG_BUILD_REST) endpoint_test.cc rest_file_io_test.cc rest_json_serde_test.cc + rest_metrics_reporter_test.cc rest_util_test.cc) if(ICEBERG_SIGV4) diff --git a/src/iceberg/test/delete_file_index_test.cc b/src/iceberg/test/delete_file_index_test.cc index 0c8c8821b..e9afd82f6 100644 --- a/src/iceberg/test/delete_file_index_test.cc +++ b/src/iceberg/test/delete_file_index_test.cc @@ -36,6 +36,8 @@ #include "iceberg/manifest/manifest_reader.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/metadata_columns.h" +#include "iceberg/metrics/metrics_context.h" +#include "iceberg/metrics/scan_report.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/test/matchers.h" @@ -189,13 +191,17 @@ class DeleteFileIndexTest : public testing::TestWithParam { Result> BuildIndex( std::vector delete_manifests, - std::optional after_sequence_number = std::nullopt) { + std::optional after_sequence_number = std::nullopt, + ScanMetrics* scan_metrics = nullptr) { ICEBERG_ASSIGN_OR_RAISE(auto builder, DeleteFileIndex::BuilderFor(file_io_, schema_, GetSpecsById(), std::move(delete_manifests))); if (after_sequence_number.has_value()) { builder.AfterSequenceNumber(after_sequence_number.value()); } + if (scan_metrics != nullptr) { + builder.ScanMetrics(scan_metrics); + } return builder.Build(); } @@ -260,6 +266,41 @@ TEST_P(DeleteFileIndexTest, TestMinSequenceNumberFilteringForFiles) { EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete-2.parquet"); } +TEST_P(DeleteFileIndexTest, TestMinSequenceNumberFilteringDoesNotCountAsSkipped) { + auto version = GetParam(); + + auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector entries; + // Dropped by the after_sequence_number filter (seq 4 is not > 4). + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, eq_delete_1)); + // Kept (seq 6 > 4). + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, eq_delete_2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + auto metrics_context = MetricsContext::Default(); + auto scan_metrics = ScanMetrics::Make(*metrics_context); + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}, /*after_sequence_number=*/4, + scan_metrics.get())); + + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 1); + + // Java drops delete files filtered out by min_sequence_number without counting them + // as skipped; only the kept file should be counted as indexed. + EXPECT_EQ(scan_metrics->skipped_delete_files->value(), 0); + EXPECT_EQ(scan_metrics->indexed_delete_files->value(), 1); +} + TEST_P(DeleteFileIndexTest, TestUnpartitionedDeletes) { auto version = GetParam(); @@ -986,6 +1027,48 @@ TEST_P(DeleteFileIndexTest, TestReferencedDeleteFiles) { "/path/to/global-eq-delete.parquet")); } +TEST_P(DeleteFileIndexTest, TestDeleteFileCountedOnceAcrossMultipleDataFiles) { + auto version = GetParam(); + + // A single global equality delete file applies to every unpartitioned data file. + auto global_eq_delete = MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, global_eq_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + auto metrics_context = MetricsContext::Default(); + auto scan_metrics = ScanMetrics::Make(*metrics_context); + ICEBERG_UNWRAP_OR_FAIL( + auto index, + BuildIndex({manifest}, /*after_sequence_number=*/std::nullopt, scan_metrics.get())); + + // The same delete file matches two different data files... + auto other_unpartitioned_file = + MakeDataFile("/path/to/data-other.parquet", PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + ICEBERG_UNWRAP_OR_FAIL(auto deletes_for_first, + index->ForDataFile(0, *unpartitioned_file_)); + ICEBERG_UNWRAP_OR_FAIL(auto deletes_for_second, + index->ForDataFile(0, *other_unpartitioned_file)); + EXPECT_EQ(deletes_for_first.size(), 1); + EXPECT_EQ(deletes_for_second.size(), 1); + + // ...but it must only be counted once in indexed_delete_files/type counters, since + // those are counted at index-build time rather than once per data file it is matched + // against. result_delete_files/total_delete_file_size_in_bytes are not incremented + // here at all -- they are counted once per FileScanTask in ManifestGroup, mirroring + // Java's ScanMetricsUtil.fileTask() (BuildIndex() alone doesn't exercise that path). + EXPECT_EQ(scan_metrics->indexed_delete_files->value(), 1); + EXPECT_EQ(scan_metrics->result_delete_files->value(), 0); + EXPECT_EQ(scan_metrics->equality_delete_files->value(), 1); +} + TEST_P(DeleteFileIndexTest, TestExistingDeleteFiles) { auto version = GetParam(); diff --git a/src/iceberg/test/expression_visitor_test.cc b/src/iceberg/test/expression_visitor_test.cc index 97d70d7a8..81b5d4e65 100644 --- a/src/iceberg/test/expression_visitor_test.cc +++ b/src/iceberg/test/expression_visitor_test.cc @@ -17,11 +17,14 @@ * under the License. */ +#include + #include #include "iceberg/expression/binder.h" #include "iceberg/expression/expressions.h" #include "iceberg/expression/rewrite_not.h" +#include "iceberg/expression/sanitize_expression.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/test/matchers.h" @@ -506,6 +509,157 @@ TEST_F(RewriteNotTest, ComplexExpression) { EXPECT_EQ(rewritten->op(), Expression::Operation::kOr); } +class SanitizeExpressionTest : public ExpressionVisitorTest {}; + +TEST_F(SanitizeExpressionTest, Constants) { + auto true_expr = Expressions::AlwaysTrue(); + ICEBERG_UNWRAP_OR_FAIL(auto sanitized_true, SanitizeExpression::Sanitize(true_expr)); + EXPECT_TRUE(sanitized_true->Equals(*True::Instance())); + + auto false_expr = Expressions::AlwaysFalse(); + ICEBERG_UNWRAP_OR_FAIL(auto sanitized_false, SanitizeExpression::Sanitize(false_expr)); + EXPECT_TRUE(sanitized_false->Equals(*False::Instance())); +} + +TEST_F(SanitizeExpressionTest, BoundLiteralPredicateHidesValue) { + auto unbound_pred = Expressions::Equal("name", Literal::String("alice@example.com")); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(bound_pred)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kEq); + EXPECT_TRUE(sanitized->is_unbound_predicate()); + EXPECT_THAT( + sanitized->ToString(), + ::testing::MatchesRegex(R"re(ref\(name="name"\) == "\(hash-[0-9a-f]{8}\)")re")); + EXPECT_THAT(sanitized->ToString(), ::testing::Not(::testing::HasSubstr("alice"))); +} + +TEST_F(SanitizeExpressionTest, UnboundLiteralPredicateHidesValue) { + auto unbound_pred = Expressions::GreaterThan("age", Literal::Int(42)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(unbound_pred)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kGt); + EXPECT_EQ(sanitized->ToString(), "ref(name=\"age\") > \"(2-digit-int)\""); + EXPECT_THAT(sanitized->ToString(), ::testing::Not(::testing::HasSubstr("42"))); +} + +TEST_F(SanitizeExpressionTest, UnaryPredicateNeedsNoLiteral) { + auto unbound_pred = Expressions::IsNull("salary"); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(bound_pred)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kIsNull); + EXPECT_EQ(sanitized->ToString(), "is_null(ref(name=\"salary\"))"); +} + +TEST_F(SanitizeExpressionTest, SetPredicateSanitizesEachElement) { + auto unbound_pred = Expressions::In( + "name", + {Literal::String("alice"), Literal::String("bob"), Literal::String("carol")}); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(bound_pred)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kIn); + EXPECT_THAT( + sanitized->ToString(), + ::testing::MatchesRegex( + R"re(ref\(name="name"\) in \["\(hash-[0-9a-f]{8}\)"(, "\(hash-[0-9a-f]{8}\)"){2}\])re")); + for (const auto* leaked : {"alice", "bob", "carol"}) { + EXPECT_THAT(sanitized->ToString(), ::testing::Not(::testing::HasSubstr(leaked))); + } +} + +TEST_F(SanitizeExpressionTest, FractionalFloatLiteralDigitCount) { + auto unbound_pred = Expressions::LessThan("salary", Literal::Double(0.05)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(unbound_pred)); + EXPECT_EQ(sanitized->ToString(), "ref(name=\"salary\") < \"(0-digit-float)\""); +} + +TEST_F(SanitizeExpressionTest, TimestampLiteralBucketsByHoursAgo) { + auto fifty_hours_ago = std::chrono::system_clock::now() - std::chrono::hours(50); + int64_t micros = std::chrono::duration_cast( + fifty_hours_ago.time_since_epoch()) + .count(); + auto unbound_pred = Expressions::LessThan("ts", Literal::Timestamp(micros)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(unbound_pred)); + EXPECT_THAT(sanitized->ToString(), + ::testing::MatchesRegex( + R"re(ref\(name="ts"\) < "\(timestamp-(49|50)-hours-ago\)")re")); +} + +TEST_F(SanitizeExpressionTest, TimestampLiteralBucketsByDaysAgo) { + auto ten_days_ago = std::chrono::system_clock::now() - std::chrono::hours(240); + int64_t micros = std::chrono::duration_cast( + ten_days_ago.time_since_epoch()) + .count(); + auto unbound_pred = Expressions::LessThan("ts", Literal::Timestamp(micros)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(unbound_pred)); + EXPECT_THAT( + sanitized->ToString(), + ::testing::MatchesRegex(R"re(ref\(name="ts"\) < "\(timestamp-10-days-ago\)")re")); +} + +TEST_F(SanitizeExpressionTest, UnboundPredicateOverTransformKeepsTransform) { + auto bucket_term = Expressions::Bucket("id", 16); + auto unbound_pred = Expressions::Equal(bucket_term, Literal::Int(5)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(unbound_pred)); + EXPECT_EQ(sanitized->ToString(), "bucket[16](ref(name=\"id\")) == \"(1-digit-int)\""); +} + +TEST_F(SanitizeExpressionTest, BoundPredicateOverTransformKeepsTransform) { + // Regression test: Java's unbind(BoundTerm) rebuilds a BoundTransform term as a + // transform term, not a plain reference; BoundPredicate::reference() alone would + // silently discard the transform. + auto bucket_term = Expressions::Bucket("id", 16); + auto unbound_pred = Expressions::Equal(bucket_term, Literal::Int(5)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred, Bind(unbound_pred)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(bound_pred)); + EXPECT_TRUE(sanitized->is_unbound_predicate()); + EXPECT_EQ(sanitized->ToString(), "bucket[16](ref(name=\"id\")) == \"(1-digit-int)\""); +} + +TEST_F(SanitizeExpressionTest, PreservesAndOrNotStructure) { + auto pred1 = Expressions::Equal("name", Literal::String("alice@example.com")); + auto pred2 = Expressions::GreaterThan("age", Literal::Int(25)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred1, Bind(pred1)); + ICEBERG_UNWRAP_OR_FAIL(auto bound_pred2, Bind(pred2)); + auto not_pred2 = Expressions::Not(bound_pred2); + auto and_expr = Expressions::And(bound_pred1, not_pred2); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, SanitizeExpression::Sanitize(and_expr)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kAnd); + EXPECT_THAT(sanitized->ToString(), ::testing::Not(::testing::HasSubstr("alice"))); + EXPECT_THAT(sanitized->ToString(), ::testing::Not(::testing::HasSubstr("25"))); +} + +TEST_F(SanitizeExpressionTest, BindWithFallbackMatchesUnboundOnSuccess) { + auto unbound_pred = Expressions::GreaterThan("age", Literal::Int(42)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized_unbound, + SanitizeExpression::Sanitize(unbound_pred)); + ICEBERG_UNWRAP_OR_FAIL(auto sanitized_bound, + SanitizeExpression::Sanitize(*schema_, unbound_pred, + /*case_sensitive=*/true)); + EXPECT_EQ(sanitized_bound->ToString(), sanitized_unbound->ToString()); +} + +TEST_F(SanitizeExpressionTest, BindWithFallbackFallsBackOnUnknownColumn) { + // "not_a_column" isn't in schema_, so binding fails and Sanitize() should fall back + // to sanitizing the unbound expression rather than propagating the bind error. + auto unbound_pred = Expressions::GreaterThan("not_a_column", Literal::Int(42)); + + ICEBERG_UNWRAP_OR_FAIL(auto sanitized, + SanitizeExpression::Sanitize(*schema_, unbound_pred, + /*case_sensitive=*/true)); + EXPECT_EQ(sanitized->op(), Expression::Operation::kGt); + EXPECT_EQ(sanitized->ToString(), "ref(name=\"not_a_column\") > \"(2-digit-int)\""); +} + class ReferenceVisitorTest : public ExpressionVisitorTest {}; TEST_F(ReferenceVisitorTest, Constants) { diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 8853d419c..b46a652ae 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,10 +20,12 @@ #include "iceberg/update/fast_append.h" #include +#include #include #include #include #include +#include #include #include @@ -33,6 +35,9 @@ #include "iceberg/constants.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_writer.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/metrics_reporters.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" @@ -40,6 +45,8 @@ #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transaction.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -314,4 +321,173 @@ TEST_F(SnapshotUpdateTest, ConcurrentManifestPaths) { } } +// --------------------------------------------------------------------------- +// Metrics integration tests +// --------------------------------------------------------------------------- + +namespace { + +class CapturingReporter final : public MetricsReporter { + public: + Status Report(const MetricsReport& report) override { + reports_.push_back(report); + return {}; + } + const std::vector& reports() const { return reports_; } + void clear() { reports_.clear(); } + + private: + std::vector reports_; +}; + +CapturingReporter* g_capturing_reporter = nullptr; + +void RegisterCapturingReporter() { + static std::once_flag flag; + std::call_once(flag, [] { + (void)MetricsReporters::Register( + "fast.append.test.reporter", + [](const auto&) -> Result> { + auto ptr = std::make_unique(); + g_capturing_reporter = ptr.get(); + return ptr; + }); + }); +} + +} // namespace + +// Test fixture that creates an InMemoryCatalog with a CapturingReporter so +// CommitReports emitted by Transaction::Commit() are observable. +class FastAppendMetricsTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + avro::RegisterAll(); + RegisterCapturingReporter(); + } + + void SetUp() override { + table_ident_ = TableIdentifier{.name = "metrics_test_table"}; + table_location_ = "/warehouse/metrics_test_table"; + + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = InMemoryCatalog::Make( + "metrics_test_catalog", file_io_, "/warehouse/", + {{std::string(kMetricsReporterImpl), "fast.append.test.reporter"}}); + + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL( + auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json")); + metadata->location = table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + ICEBERG_UNWRAP_OR_FAIL(table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t record_count, + int64_t size, int64_t partition_value = 0) { + auto data_file = std::make_shared(); + data_file->content = DataFile::Content::kData; + data_file->file_path = table_location_ + path; + data_file->file_format = FileFormatType::kParquet; + data_file->partition = + PartitionValues(std::vector{Literal::Long(partition_value)}); + data_file->file_size_in_bytes = size; + data_file->record_count = record_count; + data_file->partition_spec_id = spec_->spec_id(); + return data_file; + } + + TableIdentifier table_ident_; + std::string table_location_; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr
table_; + std::shared_ptr spec_; + std::shared_ptr schema_; +}; + +// A CommitReport must be emitted once for each FastAppend commit that creates a +// new snapshot. Validate table_name, snapshot_id, operation, and attempt count. +TEST_F(FastAppendMetricsTest, CommitReportFiredAfterFastAppend) { + ASSERT_NE(g_capturing_reporter, nullptr); + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(MakeDataFile("/data/file_a.parquet", 100, 1024, 1024)); + ASSERT_THAT(fast_append->Commit(), IsOk()); + + ASSERT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + + const auto& reports = g_capturing_reporter->reports(); + ASSERT_EQ(reports.size(), 1u); + ASSERT_TRUE(std::holds_alternative(reports[0])); + + const auto& report = std::get(reports[0]); + EXPECT_EQ(report.table_name, table_->FullyQualifiedName()); + EXPECT_EQ(report.table_name, "metrics_test_catalog." + table_ident_.ToString()); + EXPECT_EQ(report.snapshot_id, snapshot->snapshot_id); + EXPECT_EQ(report.operation, "append"); + ASSERT_TRUE(report.commit_metrics.attempts.has_value()); + EXPECT_EQ(report.commit_metrics.attempts->value, 1); +} + +// A reporter set directly on the FastAppend via ReportWith() must take precedence +// over the table's configured reporter, mirroring Java's SnapshotProducer.reportWith(). +TEST_F(FastAppendMetricsTest, ReportWithOverridesTableReporter) { + ASSERT_NE(g_capturing_reporter, nullptr); + + auto override_reporter = std::make_shared(); + + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->ReportWith(override_reporter); + fast_append->AppendFile(MakeDataFile("/data/file_a.parquet", 100, 1024, 1024)); + ASSERT_THAT(fast_append->Commit(), IsOk()); + + // The override reporter receives the CommitReport. + ASSERT_EQ(override_reporter->reports().size(), 1u); + EXPECT_TRUE(std::holds_alternative(override_reporter->reports()[0])); + + // The table's own reporter must not receive it. + EXPECT_TRUE(g_capturing_reporter->reports().empty()); +} + +// A property-only commit must NOT emit a CommitReport because it does not +// create a new snapshot. This covers the original bug where comparing a +// pre-commit snapshot ID of -1 against the existing snapshot ID would be +// skipped by the has_value() guard. +TEST_F(FastAppendMetricsTest, CommitReportNotFiredForPropertyOnlyCommit) { + ASSERT_NE(g_capturing_reporter, nullptr); + + // First do a FastAppend to create a snapshot, then clear the recorder. + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->AppendFile(MakeDataFile("/data/file_a.parquet", 100, 1024, 1024)); + ASSERT_THAT(fast_append->Commit(), IsOk()); + ASSERT_EQ(g_capturing_reporter->reports().size(), 1u); + g_capturing_reporter->clear(); + + // Property-only commit on a table that already has a snapshot. + ASSERT_THAT(table_->Refresh(), IsOk()); + std::shared_ptr update_props; + ICEBERG_UNWRAP_OR_FAIL(update_props, table_->NewUpdateProperties()); + update_props->Set("test-key", "test-value"); + ASSERT_THAT(update_props->Commit(), IsOk()); + + // No new snapshot was created, so no CommitReport must be emitted. + EXPECT_TRUE(g_capturing_reporter->reports().empty()); +} + } // namespace iceberg diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index af6921f1e..0134ef5d2 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -28,6 +28,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/exception.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" @@ -91,6 +92,13 @@ class InMemoryCatalogTest : public ::testing::Test { std::vector created_temp_paths_; }; +TEST_F(InMemoryCatalogTest, InvalidMetricsReporterImplThrows) { + std::unordered_map properties = { + {"metrics-reporter-impl", "this-reporter-type-does-not-exist"}}; + EXPECT_THROW(InMemoryCatalog("test_catalog", file_io_, "/tmp/warehouse/", properties), + IcebergError); +} + TEST_F(InMemoryCatalogTest, CatalogName) { EXPECT_EQ(catalog_->name(), "test_catalog"); auto tablesRs = catalog_->ListTables(Namespace{{}}); diff --git a/src/iceberg/test/rest_metrics_reporter_test.cc b/src/iceberg/test/rest_metrics_reporter_test.cc new file mode 100644 index 000000000..08f0613e4 --- /dev/null +++ b/src/iceberg/test/rest_metrics_reporter_test.cc @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/catalog/rest/rest_metrics_reporter.h" + +#include + +#include + +#include "iceberg/catalog/rest/auth/auth_session.h" +#include "iceberg/catalog/rest/http_client.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::rest { + +class RestMetricsReporterTest : public ::testing::Test { + protected: + void SetUp() override { + client_ = std::make_shared(); + session_ = auth::AuthSession::MakeDefault({}); + } + + std::shared_ptr client_; + std::shared_ptr session_; +}; + +// Report() must return OK even when the HTTP call fails (connection refused). +// This validates the fire-and-forget error-suppression contract matching Java behavior. +TEST_F(RestMetricsReporterTest, ReportSuppressesHttpErrorsForScanReport) { + RestMetricsReporter reporter(client_, "http://localhost:0/v1/ns/tables/tbl/metrics", + session_); + + ScanReport report; + report.table_name = "ns.tbl"; + report.snapshot_id = 42; + report.schema_id = 0; + // Leave filter/metrics as default; serialization should still succeed. + + EXPECT_THAT(reporter.Report(report), IsOk()); +} + +TEST_F(RestMetricsReporterTest, ReportSuppressesHttpErrorsForCommitReport) { + RestMetricsReporter reporter(client_, "http://localhost:0/v1/ns/tables/tbl/metrics", + session_); + + CommitReport report; + report.table_name = "ns.tbl"; + report.snapshot_id = 99; + report.sequence_number = 1; + report.operation = "append"; + + EXPECT_THAT(reporter.Report(report), IsOk()); +} + +} // namespace iceberg::rest diff --git a/src/iceberg/test/scan_planning_metrics_test.cc b/src/iceberg/test/scan_planning_metrics_test.cc new file mode 100644 index 000000000..498754885 --- /dev/null +++ b/src/iceberg/test/scan_planning_metrics_test.cc @@ -0,0 +1,572 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// \file scan_planning_metrics_test.cc +/// End-to-end tests for scan planning metrics, mirroring Java's +/// ScanPlanningAndReportingTestBase. + +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metrics/metrics_reporter.h" +#include "iceberg/metrics/scan_report.h" +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/scan_test_base.h" +#include "iceberg/transform.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +namespace { + +/// Reporter that captures the most recent ScanReport for assertions. +class CapturingReporter final : public MetricsReporter { + public: + Status Report(const MetricsReport& report) override { + if (std::holds_alternative(report)) { + last_ = std::get(report); + } + return {}; + } + + const std::optional& last() const { return last_; } + void clear() { last_.reset(); } + + private: + std::optional last_; +}; + +} // namespace + +class ScanPlanningMetricsTest : public ScanTestBase { + protected: + void SetUp() override { + ScanTestBase::SetUp(); + reporter_ = std::make_shared(); + + ICEBERG_UNWRAP_OR_FAIL( + id_identity_spec_, + PartitionSpec::Make(/*spec_id=*/2, + {PartitionField(/*source_id=*/1, /*field_id=*/1001, "id", + Transform::Identity())})); + } + + /// \brief Build a DataFile with optional lower/upper bounds on the "id" field. + std::shared_ptr MakeDataFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, int64_t record_count = 1, + std::optional lower_id = std::nullopt, + std::optional upper_id = std::nullopt) { + auto file = std::make_shared(DataFile{ + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = record_count, + .file_size_in_bytes = 10, + .sort_order_id = 0, + .partition_spec_id = spec_id, + }); + if (lower_id.has_value()) { + file->lower_bounds[1] = Literal::Int(lower_id.value()).Serialize().value(); + } + if (upper_id.has_value()) { + file->upper_bounds[1] = Literal::Int(upper_id.value()).Serialize().value(); + } + return file; + } + + /// \brief Build a positional-delete DataFile. + std::shared_ptr MakePositionDeleteFile( + const std::string& path, const PartitionValues& partition, int32_t spec_id, + std::optional referenced_file = std::nullopt) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .referenced_data_file = std::move(referenced_file), + .partition_spec_id = spec_id, + }); + } + + /// \brief Build a global (unpartitioned) equality-delete DataFile. + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, + std::vector equality_ids = {1}) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .equality_ids = std::move(equality_ids), + .partition_spec_id = spec_id, + }); + } + + /// \brief Build a single-snapshot TableMetadata from a manifest list path. + std::shared_ptr BuildMetadata( + int64_t snapshot_id, const std::string& manifest_list_path, + std::shared_ptr spec = nullptr) { + if (!spec) spec = partitioned_spec_; + const auto ts = TimePointMsFromUnixMs(1609459200000L); + auto snapshot = + std::make_shared(Snapshot{.snapshot_id = snapshot_id, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1L, + .timestamp_ms = ts, + .manifest_list = manifest_list_path, + .schema_id = schema_->schema_id()}); + return std::make_shared( + TableMetadata{.format_version = 2, + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = 1L, + .last_updated_ms = ts, + .last_column_id = 2, + .schemas = {schema_}, + .current_schema_id = schema_->schema_id(), + .partition_specs = {spec, unpartitioned_spec_}, + .default_spec_id = spec->spec_id(), + .last_partition_id = 1001, + .current_snapshot_id = snapshot_id, + .snapshots = {snapshot}, + .snapshot_log = {SnapshotLogEntry{.timestamp_ms = ts, + .snapshot_id = snapshot_id}}, + .default_sort_order_id = 0, + .refs = {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = snapshot_id, + .retention = SnapshotRef::Branch{}})}}}); + } + + /// \brief Wrapper matching WriteManifestList(format_version, snap_id, seq, manifests). + std::string WriteManifestList(int8_t format_version, int64_t snapshot_id, + int64_t sequence_number, + const std::vector& manifests) { + return ScanTestBase::WriteManifestList(format_version, snapshot_id, + /*parent_snapshot_id=*/0L, sequence_number, + manifests); + } + + std::shared_ptr reporter_; + std::shared_ptr id_identity_spec_; +}; + +// --------------------------------------------------------------------------- +// Test 1: Verify a ScanReport is fired and contains basic accurate fields. +// Mirrors Java's scanningWithMultipleReporters(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanReportFiredAfterPlanFiles) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2000L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/100, + /*lower_id=*/1, /*upper_id=*/50))}, + partitioned_spec_); + auto manifest_list = + WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& report = *reporter_->last(); + EXPECT_EQ(report.table_name, "test.table"); + EXPECT_EQ(report.snapshot_id, kSnapshotId); + + const auto& m = report.scan_metrics; + ASSERT_TRUE(m.total_planning_duration.has_value()); + EXPECT_EQ(m.total_planning_duration->count, 1); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); +} + +// --------------------------------------------------------------------------- +// Test 2: Two manifests, 3 total data files — verify all 12 counters. +// Mirrors Java's scanningWithMultipleDataManifests() (unfiltered sub-scan). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithMultipleDataManifests) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2001L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto manifest1 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id())), + MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + auto manifest2 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_c.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, + {manifest1, manifest2}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 3u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 3); + ASSERT_TRUE(m.result_delete_files.has_value()); + EXPECT_EQ(m.result_delete_files->value, 0); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 2); + ASSERT_TRUE(m.scanned_delete_manifests.has_value()); + EXPECT_EQ(m.scanned_delete_manifests->value, 0); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 0); + ASSERT_TRUE(m.skipped_delete_manifests.has_value()); + EXPECT_EQ(m.skipped_delete_manifests->value, 0); + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 2); + ASSERT_TRUE(m.total_delete_manifests.has_value()); + EXPECT_EQ(m.total_delete_manifests->value, 0); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 0); + ASSERT_TRUE(m.skipped_delete_files.has_value()); + EXPECT_EQ(m.skipped_delete_files->value, 0); +} + +// --------------------------------------------------------------------------- +// Test 3: Partition filter prunes one of two manifests. +// Uses an identity(id) partition so the manifest evaluator can prune by +// the id range recorded in each manifest's partition field summary. +// Mirrors Java's scanningWithMultipleDataManifests() (filtered sub-scan). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithManifestPruning) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2002L; + + // Manifest 1: id partition = 1 (files with id=1) + const auto part1 = PartitionValues({Literal::Int(1)}); + auto manifest1 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part1, id_identity_spec_->spec_id()))}, + id_identity_spec_); + + // Manifest 2: id partition = 2 (files with id=2) + const auto part2 = PartitionValues({Literal::Int(2)}); + auto manifest2 = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part2, id_identity_spec_->spec_id()))}, + id_identity_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, + {manifest1, manifest2}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list, id_identity_spec_); + + // Filter id = 1: only manifest 1 survives the manifest-level evaluator. + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_) + .TableName("test.table") + .Filter(Expressions::Equal("id", Literal::Int(1))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + EXPECT_EQ(tasks[0]->data_file()->file_path, "/data/file_a.parquet"); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 2); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 1); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 0); +} + +// --------------------------------------------------------------------------- +// Test 4: Row-stats filter skips one entry inside a scanned manifest. +// Both files live in the same manifest; only the inclusive metrics evaluator +// (lower/upper bounds on "id") can distinguish them. +// Mirrors Java's scanningWithSkippedDataFiles(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithSkippedDataFiles) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2003L; + const auto part = PartitionValues({Literal::Int(0)}); + + // Both files share the same bucket partition so the manifest is not pruned. + // file_a covers id [1, 50]; file_b covers id [51, 100]. + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/50, /*lower_id=*/1, /*upper_id=*/50)), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id(), + /*record_count=*/50, /*lower_id=*/51, /*upper_id=*/100))}, + partitioned_spec_); + auto manifest_list = + WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + // Filter id = 25: within file_a's range, outside file_b's range. + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_) + .TableName("test.table") + .Filter(Expressions::Equal("id", Literal::Int(25))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + EXPECT_EQ(tasks[0]->data_file()->file_path, "/data/file_a.parquet"); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 1); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.skipped_data_manifests.has_value()); + EXPECT_EQ(m.skipped_data_manifests->value, 0); + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 1); + ASSERT_TRUE(m.skipped_data_files.has_value()); + EXPECT_EQ(m.skipped_data_files->value, 1); +} + +// --------------------------------------------------------------------------- +// Test 5: Scan with positional delete files — verify delete file counters. +// Mirrors Java's scanningWithDeletes(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithDeleteFiles) { + auto version = GetParam(); + if (version < 2) { + GTEST_SKIP() << "Delete files are only supported in format version 2+"; + } + constexpr int64_t kSnapshotId = 2004L; + const auto part = PartitionValues({Literal::Int(0)}); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", part, partitioned_spec_->spec_id())), + MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", part, partitioned_spec_->spec_id()))}, + partitioned_spec_); + + // One positional-delete file covering file_a. + auto delete_manifest = WriteDeleteManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2, + MakePositionDeleteFile("/data/pos_delete.parquet", part, + partitioned_spec_->spec_id(), "/data/file_a.parquet"))}, + partitioned_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/2, + {data_manifest, delete_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + ASSERT_TRUE(m.result_data_files.has_value()); + EXPECT_EQ(m.result_data_files->value, 2); + ASSERT_TRUE(m.result_delete_files.has_value()); + EXPECT_EQ(m.result_delete_files->value, 1); + ASSERT_TRUE(m.scanned_data_manifests.has_value()); + EXPECT_EQ(m.scanned_data_manifests->value, 1); + ASSERT_TRUE(m.scanned_delete_manifests.has_value()); + EXPECT_EQ(m.scanned_delete_manifests->value, 1); + ASSERT_TRUE(m.total_data_manifests.has_value()); + EXPECT_EQ(m.total_data_manifests->value, 1); + ASSERT_TRUE(m.total_delete_manifests.has_value()); + EXPECT_EQ(m.total_delete_manifests->value, 1); + ASSERT_TRUE(m.indexed_delete_files.has_value()); + EXPECT_EQ(m.indexed_delete_files->value, 1); + ASSERT_TRUE(m.positional_delete_files.has_value()); + EXPECT_EQ(m.positional_delete_files->value, 1); + ASSERT_TRUE(m.equality_delete_files.has_value()); + EXPECT_EQ(m.equality_delete_files->value, 0); + ASSERT_TRUE(m.dvs.has_value()); + EXPECT_EQ(m.dvs->value, 0); +} + +// --------------------------------------------------------------------------- +// Test: a single delete file applying to multiple data files is deduplicated in +// indexed_delete_files (counted once, at index-build time), but result_delete_files and +// total_delete_file_size_in_bytes are counted once per FileScanTask/data file, mirroring +// Java's ScanMetricsUtil.fileTask() vs. ScanMetricsUtil.indexedDeleteFile() split. +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanningWithDeleteFileSharedAcrossDataFiles) { + auto version = GetParam(); + if (version < 2) { + GTEST_SKIP() << "Delete files are only supported in format version 2+"; + } + constexpr int64_t kSnapshotId = 2005L; + const auto empty_partition = PartitionValues(std::vector{}); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", empty_partition, + unpartitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_b.parquet", empty_partition, + unpartitioned_spec_->spec_id()))}, + unpartitioned_spec_); + + // A single global equality-delete file applies to every unpartitioned data file. + auto delete_manifest = WriteDeleteManifest( + version, kSnapshotId, + {MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/2, + MakeEqualityDeleteFile("/data/global-eq-delete.parquet", empty_partition, + unpartitioned_spec_->spec_id()))}, + unpartitioned_spec_); + + auto manifest_list = WriteManifestList(version, kSnapshotId, /*sequence_number=*/2, + {data_manifest, delete_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list, unpartitioned_spec_); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 2u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& m = reporter_->last()->scan_metrics; + + // Deduplicated: the delete file is indexed once, regardless of how many data files it + // matches. + ASSERT_TRUE(m.indexed_delete_files.has_value()); + EXPECT_EQ(m.indexed_delete_files->value, 1); + ASSERT_TRUE(m.equality_delete_files.has_value()); + EXPECT_EQ(m.equality_delete_files->value, 1); + + // Not deduplicated: counted once per FileScanTask, so the shared delete file + // contributes to both data files' tasks. + ASSERT_TRUE(m.result_delete_files.has_value()); + EXPECT_EQ(m.result_delete_files->value, 2); +} + +// --------------------------------------------------------------------------- +// Test: ScanReport includes nested projected field ids/names, not just the +// top-level struct field. Mirrors Java's use of TypeUtil.getProjectedIds(). +// --------------------------------------------------------------------------- +TEST_P(ScanPlanningMetricsTest, ScanReportIncludesNestedProjectedFields) { + auto version = GetParam(); + constexpr int64_t kSnapshotId = 2020L; + + // Override the base flat schema with one that has a nested struct column. + schema_ = std::make_shared( + std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired( + 2, "location", + struct_({SchemaField::MakeRequired(3, "lat", float64()), + SchemaField::MakeRequired(4, "long", float64())}))}, + /*schema_id=*/0); + + auto data_manifest = WriteDataManifest( + version, kSnapshotId, + {MakeEntry( + ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/data/file_a.parquet", PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()))}, + unpartitioned_spec_); + auto manifest_list = + WriteManifestList(version, kSnapshotId, /*sequence_number=*/1, {data_manifest}); + auto metadata = BuildMetadata(kSnapshotId, manifest_list, unpartitioned_spec_); + + reporter_->clear(); + ICEBERG_UNWRAP_OR_FAIL(auto builder, DataTableScanBuilder::Make(metadata, file_io_)); + builder->MetricsReporter(reporter_).TableName("test.table"); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles()); + ASSERT_EQ(tasks.size(), 1u); + + ASSERT_TRUE(reporter_->last().has_value()); + const auto& report = *reporter_->last(); + + EXPECT_THAT(report.projected_field_ids, ::testing::UnorderedElementsAre(1, 2, 3, 4)); + EXPECT_THAT( + report.projected_field_names, + ::testing::UnorderedElementsAre("id", "location", "location.lat", "location.long")); +} + +INSTANTIATE_TEST_SUITE_P(ScanPlanningMetricsVersions, ScanPlanningMetricsTest, + testing::Values(2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index ef21f12d6..e72475ebd 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -167,4 +167,53 @@ TEST(StaticTableTest, NewMutatingOperationsAreNotSupported) { EXPECT_THAT(table->NewSnapshotManager(), IsError(ErrorKind::kNotSupported)); } +class TableFullyQualifiedNameTest : public ::testing::Test { + protected: + void SetUp() override { + io_ = std::make_shared(); + catalog_ = std::make_shared(); + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int64())}, 1); + metadata_ = std::make_shared( + TableMetadata{.format_version = 2, .schemas = {schema}, .current_schema_id = 1}); + } + + Result> MakeTable(std::shared_ptr catalog) { + TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = "test_table"}; + return Table::Make(ident, metadata_, "s3://bucket/meta.json", io_, + std::move(catalog)); + } + + std::shared_ptr io_; + std::shared_ptr catalog_; + std::shared_ptr metadata_; +}; + +TEST_F(TableFullyQualifiedNameTest, NoCatalog) { + TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = "test_table"}; + ICEBERG_UNWRAP_OR_FAIL( + auto table, StaticTable::Make(ident, metadata_, "s3://bucket/meta.json", io_)); + EXPECT_EQ(table->FullyQualifiedName(), "db.test_table"); +} + +TEST_F(TableFullyQualifiedNameTest, DotJoinedCatalogName) { + EXPECT_CALL(*catalog_, name()).WillRepeatedly(::testing::Return("my_catalog")); + ICEBERG_UNWRAP_OR_FAIL(auto table, MakeTable(catalog_)); + EXPECT_EQ(table->FullyQualifiedName(), "my_catalog.db.test_table"); +} + +TEST_F(TableFullyQualifiedNameTest, UriCatalogNameWithoutTrailingSlash) { + EXPECT_CALL(*catalog_, name()) + .WillRepeatedly(::testing::Return("thrift://localhost:9083")); + ICEBERG_UNWRAP_OR_FAIL(auto table, MakeTable(catalog_)); + EXPECT_EQ(table->FullyQualifiedName(), "thrift://localhost:9083/db.test_table"); +} + +TEST_F(TableFullyQualifiedNameTest, UriCatalogNameWithTrailingSlash) { + EXPECT_CALL(*catalog_, name()) + .WillRepeatedly(::testing::Return("hdfs://nameservice/warehouse/")); + ICEBERG_UNWRAP_OR_FAIL(auto table, MakeTable(catalog_)); + EXPECT_EQ(table->FullyQualifiedName(), "hdfs://nameservice/warehouse/db.test_table"); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 169e7ec90..0287110aa 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -23,6 +23,8 @@ #include #include "iceberg/catalog.h" +#include "iceberg/metrics/commit_report.h" +#include "iceberg/metrics/metrics_context.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/statistics_file.h" @@ -274,6 +276,10 @@ Status Transaction::ApplyUpdateSnapshot(SnapshotUpdate& update) { ICEBERG_ASSIGN_OR_RAISE(auto result, update.Apply()); + if (const auto& override_reporter = update.reporter()) { + snapshot_reporter_ = override_reporter; + } + // Create a temp builder to check if this is an empty update auto temp_update = TableMetadataBuilder::BuildFrom(&base); if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) { @@ -368,16 +374,28 @@ Result> Transaction::Commit() { int32_t min_wait_ms = props.Get(TableProperties::kCommitMinRetryWaitMs); int32_t max_wait_ms = props.Get(TableProperties::kCommitMaxRetryWaitMs); int32_t total_timeout_ms = props.Get(TableProperties::kCommitTotalRetryTimeMs); + int64_t pre_commit_snapshot_id = -1; + if (auto pre = ctx_->table->metadata()->Snapshot(); pre.has_value() && pre.value()) { + pre_commit_snapshot_id = pre.value()->snapshot_id; + } + + auto metrics_context = MetricsContext::Default(); + auto commit_metrics = CommitMetrics::Make(*metrics_context); + auto timed = commit_metrics->total_duration->Start(); bool is_first_attempt = true; auto commit_result = MakeCommitRetryRunner(num_retries, min_wait_ms, max_wait_ms, total_timeout_ms) - .Run([this, &is_first_attempt]() -> Result> { + .Run([this, &is_first_attempt, + &commit_metrics]() -> Result> { + commit_metrics->attempts->Increment(1); auto result = CommitOnce(is_first_attempt); is_first_attempt = false; return result; }); + timed.Stop(); + Result finalize_result = commit_result.has_value() ? Result(commit_result.value()->metadata().get()) @@ -393,6 +411,30 @@ Result> Transaction::Commit() { committed_ = true; ctx_->table = std::move(commit_result.value()); + // Fire CommitReport only when a new snapshot was produced (not for property-only + // commits). A SnapshotUpdate's own ReportWith() override (captured into + // snapshot_reporter_ by ApplyUpdateSnapshot()) takes precedence over the table's + // reporter. + std::shared_ptr reporter = + snapshot_reporter_ ? snapshot_reporter_ : ctx_->table->reporter(); + if (reporter) { + auto snapshot_result = ctx_->table->metadata()->Snapshot(); + if (snapshot_result.has_value() && snapshot_result.value() && + snapshot_result.value()->snapshot_id != pre_commit_snapshot_id) { + const auto& snapshot = snapshot_result.value(); + const auto op = snapshot->Operation(); + CommitReport report{ + .table_name = ctx_->table->FullyQualifiedName(), + .snapshot_id = snapshot->snapshot_id, + .sequence_number = snapshot->sequence_number, + .operation = op.has_value() ? std::string(op.value()) : "", + .commit_metrics = CommitMetricsResult::From(*commit_metrics, snapshot->summary), + .metadata = {}, + }; + (void)reporter->Report(report); + } + } + return ctx_->table; } @@ -490,6 +532,9 @@ Result> Transaction::NewFastAppend() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr fast_append, FastAppend::Make(ctx_->table->name().name, ctx_)); ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append)); + if (const auto& r = ctx_->table->reporter()) { + fast_append->ReportWith(r); + } return fast_append; } diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 49b607d60..35a0292b1 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -163,6 +163,11 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this ctx_; // Keep track of all created pending updates. std::vector> pending_updates_; + // Reporter override captured from the most recently applied SnapshotUpdate's + // ReportWith(), if any. Captured in ApplyUpdateSnapshot() rather than looked up from + // pending_updates_, since the table-created commit path (PendingUpdate::Commit()) + // applies the update directly without ever registering it there. + std::shared_ptr snapshot_reporter_; // To make the state simple, we require updates are added and committed in order. bool last_update_committed_ = true; // Tracks if transaction has been committed to prevent double-commit diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 784b3e03b..c1f3399e2 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -225,6 +225,10 @@ class LocationProvider; class SessionCatalog; struct SessionContext; +/// \brief Metrics reporting. +class MetricsReporter; +class ScanMetrics; + /// \brief Table. class Table; class TableProperties; diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index fa8dcbf12..3f168136a 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -30,6 +30,7 @@ #include #include "iceberg/iceberg_export.h" +#include "iceberg/metrics/metrics_reporter.h" #include "iceberg/result.h" #include "iceberg/snapshot.h" #include "iceberg/type_fwd.h" @@ -57,6 +58,20 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { Kind kind() const override { return Kind::kUpdateSnapshot; } bool IsRetryable() const override { return true; } + /// \brief Set the metrics reporter for this snapshot update. + /// + /// \param reporter The metrics reporter to use. + /// \return Reference to this for method chaining. + auto& ReportWith(this auto& self, std::shared_ptr reporter) { + static_cast(self).reporter_ = std::move(reporter); + return self; + } + + /// \brief Get the metrics reporter explicitly set via ReportWith(), if any. + /// + /// \return The reporter override for this operation, or null if none was set. + const std::shared_ptr& reporter() const { return reporter_; } + /// \brief Set a callback to delete files instead of the table's default. /// /// \param delete_func A function used to delete file locations. @@ -237,6 +252,10 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { SnapshotSummaryBuilder BuildManifestCountSummary( std::span manifests, int32_t replaced_manifests_count); + protected: + /// \brief Reporter to receive CommitReport after a successful commit. + std::shared_ptr reporter_; + private: /// \brief Returns the snapshot summary from the implementation and updates totals. Result> ComputeSummary( diff --git a/src/iceberg/util/content_file_util.cc b/src/iceberg/util/content_file_util.cc index 89c875087..571fddcc8 100644 --- a/src/iceberg/util/content_file_util.cc +++ b/src/iceberg/util/content_file_util.cc @@ -83,6 +83,10 @@ std::string ContentFileUtil::DVDesc(const DataFile& file) { file.referenced_data_file.value_or("")); } +int64_t ContentFileUtil::ContentSizeInBytes(const DataFile& file) { + return file.content_size_in_bytes.value_or(file.file_size_in_bytes); +} + void ContentFileUtil::DropAllStats(DataFile& data_file) { data_file.column_sizes.clear(); data_file.value_counts.clear(); diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h index f547716d2..0db2e755d 100644 --- a/src/iceberg/util/content_file_util.h +++ b/src/iceberg/util/content_file_util.h @@ -51,6 +51,10 @@ struct ICEBERG_EXPORT ContentFileUtil { /// \brief Generate a description string for a deletion vector. static std::string DVDesc(const DataFile& file); + /// \brief Size in bytes of the portion of `file` relevant to a scan task — the DV's + /// own content size when set, otherwise the file's total size. + static int64_t ContentSizeInBytes(const DataFile& file); + /// \brief In-place drop stats. static void DropAllStats(DataFile& data_file);