Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(ICEBERG_SOURCES
file_writer.cc
inspect/history_table.cc
inspect/metadata_table.cc
inspect/row_builder_internal.cc

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just name it to row_builder.cc because _internal suffix is used for marking header files that do not need to be installed.

inspect/snapshots_table.cc
inheritable_metadata.cc
json_serde.cc
Expand Down
134 changes: 134 additions & 0 deletions src/iceberg/inspect/row_builder_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/inspect/row_builder_internal.h"

#include <utility>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"

namespace iceberg {

Result<ArrowRowBuilder> ArrowRowBuilder::Make(const Schema& schema) {
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &arrow_schema));
internal::ArrowSchemaGuard schema_guard(&arrow_schema);

auto array = std::make_unique<ArrowArray>();
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayInitFromSchema(array.get(), &arrow_schema, &error), error);
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(array.get()));

return ArrowRowBuilder(std::move(array));
}

ArrowRowBuilder::ArrowRowBuilder(std::unique_ptr<ArrowArray>&& array) noexcept
: array_(std::move(array)) {}

ArrowRowBuilder::ArrowRowBuilder(ArrowRowBuilder&& other) noexcept
: array_(std::move(other.array_)) {}

ArrowRowBuilder& ArrowRowBuilder::operator=(ArrowRowBuilder&& other) noexcept {
if (this != &other) {
if (array_ != nullptr && array_->release != nullptr) {
ArrowArrayRelease(array_.get());
}
array_ = std::move(other.array_);
}
return *this;
}

ArrowRowBuilder::~ArrowRowBuilder() {
if (array_ != nullptr && array_->release != nullptr) {
ArrowArrayRelease(array_.get());
}
}

int64_t ArrowRowBuilder::num_columns() const {
return array_ == nullptr ? 0 : array_->n_children;
}

ArrowArray* ArrowRowBuilder::column(int64_t index) {
if (array_ == nullptr || index < 0 || index >= array_->n_children) {
return nullptr;
}
return array_->children[index];
}

Status ArrowRowBuilder::FinishRow() {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array_.get()));
return {};
}

Result<ArrowArray> ArrowRowBuilder::Finish() && {
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayFinishBuildingDefault(array_.get(), &error), error);
ArrowArray result = *array_;
array_->release = nullptr;
return result;
}

Status AppendNull(ArrowArray* array) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
return {};
}

Status AppendBoolean(ArrowArray* array, bool value) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value ? 1 : 0));
return {};
}

Status AppendInt(ArrowArray* array, int64_t value) {
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value));
return {};
}

Status AppendString(ArrowArray* array, std::string_view value) {
ArrowStringView view(value.data(), static_cast<int64_t>(value.size()));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(array, view));
return {};
}

Status AppendStringMap(ArrowArray* array,
const std::unordered_map<std::string, std::string>& entries) {
// A nanoarrow map array is a list of struct<key, value>. children[0] is the
// entries struct, whose children[0]/children[1] are the key/value builders.
ArrowArray* struct_array = array->children[0];
ArrowArray* key_array = struct_array->children[0];
ArrowArray* value_array = struct_array->children[1];

for (const auto& [key, value] : entries) {
ICEBERG_RETURN_UNEXPECTED(AppendString(key_array, key));
ICEBERG_RETURN_UNEXPECTED(AppendString(value_array, value));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(struct_array));
}

// Finish the (possibly empty) map element on the outer list.
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
return {};
}

} // namespace iceberg
114 changes: 114 additions & 0 deletions src/iceberg/inspect/row_builder_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/inspect/row_builder_internal.h

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why putting it here instead of treating it as a general utility for building arrow arrays?

/// Internal Arrow row-building utilities shared by metadata tables.
///
/// Metadata tables (snapshots, history, manifests, ...) materialize in-memory
/// structures into Arrow batches that conform to the table's Iceberg schema.
/// `ArrowRowBuilder` wraps a nanoarrow `ArrowArray` initialized from such a
/// schema and exposes per-column builders plus typed append helpers so each
/// metadata table can emit rows without re-implementing the nanoarrow
/// boilerplate.

#include <cstdint>
#include <memory>
#include <string_view>
#include <unordered_map>

#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Builds an Arrow struct array (a batch) for an arbitrary Iceberg schema.
///
/// Typical usage:
/// \code
/// ICEBERG_ASSIGN_OR_RAISE(auto builder, ArrowRowBuilder::Make(schema));
/// for (const auto& row : rows) {
/// ICEBERG_RETURN_UNEXPECTED(AppendInt(builder.column(0), row.id));
/// ICEBERG_RETURN_UNEXPECTED(AppendString(builder.column(1), row.name));
/// ICEBERG_RETURN_UNEXPECTED(builder.FinishRow());
/// }
/// ICEBERG_ASSIGN_OR_RAISE(auto array, std::move(builder).Finish());
/// \endcode
class ICEBERG_EXPORT ArrowRowBuilder {
public:
/// \brief Create a row builder for the given Iceberg schema.
static Result<ArrowRowBuilder> Make(const Schema& schema);

ArrowRowBuilder(ArrowRowBuilder&& other) noexcept;
ArrowRowBuilder& operator=(ArrowRowBuilder&& other) noexcept;

ArrowRowBuilder(const ArrowRowBuilder&) = delete;
ArrowRowBuilder& operator=(const ArrowRowBuilder&) = delete;

~ArrowRowBuilder();

/// \brief The number of top-level columns in the batch.
int64_t num_columns() const;

/// \brief Access the nanoarrow child builder for a top-level column.
///
/// \param index Zero-based column index. Returns nullptr if out of range.
ArrowArray* column(int64_t index);

/// \brief Finish the current row, advancing the struct length by one.
///
/// Call after appending exactly one value (or null) to every column.
Status FinishRow();

/// \brief Finish building and transfer ownership of the resulting array.
///
/// The builder must not be used after this call.
Result<ArrowArray> Finish() &&;

private:
explicit ArrowRowBuilder(std::unique_ptr<ArrowArray>&& array) noexcept;

std::unique_ptr<ArrowArray> array_;
};

/// \brief Append a null to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendNull(ArrowArray* array);

/// \brief Append a boolean value to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendBoolean(ArrowArray* array, bool value);

/// \brief Append an integer value to a nanoarrow array builder.
///
/// Works for int32/int64/timestamp columns, which nanoarrow stores as int64.
ICEBERG_EXPORT Status AppendInt(ArrowArray* array, int64_t value);

/// \brief Append a string value to a nanoarrow array builder.
ICEBERG_EXPORT Status AppendString(ArrowArray* array, std::string_view value);

/// \brief Append a map<string, string> value to a nanoarrow map array builder.
///
/// Appends one (possibly empty) map element. The iteration order of the
/// resulting entries is unspecified.
ICEBERG_EXPORT Status AppendStringMap(
ArrowArray* array, const std::unordered_map<std::string, std::string>& entries);

} // namespace iceberg
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ iceberg_sources = files(
'inheritable_metadata.cc',
'inspect/history_table.cc',
'inspect/metadata_table.cc',
'inspect/row_builder_internal.cc',
'inspect/snapshots_table.cc',
'json_serde.cc',
'location_provider.cc',
Expand Down
7 changes: 6 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ add_iceberg_test(schema_test
add_iceberg_test(table_test
SOURCES
location_provider_test.cc
metadata_table_test.cc
metrics_config_test.cc
metrics_reporter_test.cc
metrics_test.cc
Expand Down Expand Up @@ -185,6 +184,12 @@ if(ICEBERG_BUILD_BUNDLE)

add_iceberg_test(catalog_test USE_BUNDLE SOURCES in_memory_catalog_test.cc)

add_iceberg_test(metadata_table_test
USE_BUNDLE
SOURCES
metadata_table_test.cc
row_builder_test.cc)

add_iceberg_test(eval_expr_test
USE_BUNDLE
SOURCES
Expand Down
Loading
Loading