Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <memory>

#include <arrow/array.h>
#include <arrow/array/builder_base.h>
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
Expand Down Expand Up @@ -178,12 +179,6 @@ class GenericDatumBackend : public AvroWriteBackend {

class AvroWriter::Impl {
public:
~Impl() {
if (arrow_schema_.release != nullptr) {
ArrowSchemaRelease(&arrow_schema_);
}
}

Status Open(const WriterOptions& options) {
write_schema_ = options.schema;

Expand Down Expand Up @@ -227,19 +222,22 @@ class AvroWriter::Impl {
options.properties.Get(WriterProperties::kAvroSyncInterval), codec,
compression_level, metadata));

ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
ArrowSchema c_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &c_schema));
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));
return {};
}

Status Write(ArrowArray* data) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
::arrow::ImportArray(data, &arrow_schema_));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
::arrow::ImportRecordBatch(data, arrow_schema_));

for (int64_t i = 0; i < result->length(); i++) {
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto struct_array, batch->ToStructArray());
for (int64_t i = 0; i < struct_array->length(); i++) {
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *struct_array, i));
}

num_records_ += result->length();
num_records_ += struct_array->length();
return {};
}

Expand Down Expand Up @@ -278,8 +276,8 @@ class AvroWriter::Impl {
std::shared_ptr<::avro::ValidSchema> avro_schema_;
// Arrow output stream of the Avro file to write
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
// Arrow schema to write data.
ArrowSchema arrow_schema_;
// Arrow schema to import C data batches.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// Total length of the written Avro file.
int64_t total_bytes_ = 0;
// Number of records written.
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ Status ManifestAdapter::StartAppending() {
return InvalidArgument("Adapter buffer not empty, cannot start appending.");
}
array_ = {};
size_ = 0;
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayInitFromSchema(&array_, &schema_, &error), error);
Expand All @@ -138,6 +137,7 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayFinishBuildingDefault(&array_, &error), error);
size_ = 0;
return &array_;
}

Expand Down
53 changes: 45 additions & 8 deletions src/iceberg/test/executor_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <atomic>
#include <concepts>
#include <functional>
#include <memory>
#include <optional>
#include <ranges>
#include <span>
#include <string>
#include <tuple>
#include <unordered_map>
Expand Down Expand Up @@ -51,19 +53,15 @@ struct IntTask {
}
};

struct BoolTask {
Result<std::vector<bool>> operator()(bool) {
return Result<std::vector<bool>>{std::vector<bool>{}};
}
};

static_assert(internal::ParallelCollectible<std::vector<int>&, IntTask>);
static_assert(
std::same_as<decltype(ParallelCollect(std::nullopt, std::declval<std::vector<int>&>(),
IntTask{})),
Result<std::vector<int>>>);
static_assert(!internal::ParallelCollectible<decltype(std::views::iota(0, 3)), IntTask>);
static_assert(!internal::ParallelCollectible<std::vector<bool>&, BoolTask>);
static_assert(internal::ParallelCollectible<decltype(std::views::iota(0, 3)), IntTask>);
static_assert(std::same_as<decltype(ParallelCollect(std::nullopt, std::views::iota(0, 3),
IntTask{})),
Result<std::vector<int>>>);

} // namespace

Expand Down Expand Up @@ -139,6 +137,45 @@ TEST(ParallelCollectTest, CollectsSingleRange) {
EXPECT_THAT(*result, UnorderedElementsAre(2, 4, 6));
}

TEST(ParallelCollectTest, CollectsIotaView) {
auto input = std::views::iota(1, 4);

auto result = ParallelCollect(std::nullopt, input, [](int value) {
return Result<std::unordered_set<int>>{{value * 2}};
});

EXPECT_THAT(result, IsOk());
EXPECT_THAT(*result, UnorderedElementsAre(2, 4, 6));
}

TEST(ParallelCollectTest, CollectsTransformView) {
std::vector<int> values = {1, 2, 3, 4};
std::span<const int> files(values);
auto input = std::views::iota(0, 2) | std::views::transform([files](int index) {
return files.subspan(index * 2, 2);
});

auto result = ParallelCollect(std::nullopt, input, [](std::span<const int> group) {
return Result<std::vector<int>>{{group.front(), group.back()}};
});

EXPECT_THAT(result, IsOk());
EXPECT_THAT(*result, ElementsAre(1, 2, 3, 4));
}

TEST(ParallelCollectTest, CollectsMoveOnlyPrvalues) {
auto input = std::views::iota(1, 4) | std::views::transform([](int value) {
return std::make_unique<int>(value);
});

auto result = ParallelCollect(std::nullopt, input, [](std::unique_ptr<int> value) {
return Result<std::vector<int>>{{*value}};
});

EXPECT_THAT(result, IsOk());
EXPECT_THAT(*result, ElementsAre(1, 2, 3));
}

TEST(ParallelCollectTest, KeepsTupleResultFromSingleRange) {
std::vector<int> input = {1, 2};

Expand Down
85 changes: 85 additions & 0 deletions src/iceberg/test/fast_append_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "iceberg/update/fast_append.h"

#include <format>
#include <limits>
#include <optional>
#include <string>
#include <thread>
Expand All @@ -32,14 +33,18 @@
#include "iceberg/avro/avro_register.h"
#include "iceberg/constants.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/test/executor.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/transaction.h"
#include "iceberg/update/update_properties.h" // IWYU pragma: keep

namespace iceberg {

Expand Down Expand Up @@ -117,6 +122,29 @@ class FastAppendTest : public UpdateTestBase {
return writer->ToManifestFile();
}

void SetManifestTargetSizeBytes(int64_t size_bytes) {
ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties());
props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()),
std::to_string(size_bytes));
EXPECT_THAT(props->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
}

Result<std::vector<ManifestFile>> CurrentDataManifests() {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table_->current_snapshot());
SnapshotCache snapshot_cache(snapshot.get());
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(file_io_));
return std::vector<ManifestFile>(manifests.begin(), manifests.end());
}

Result<std::vector<ManifestEntry>> ReadEntries(const ManifestFile& manifest) {
ICEBERG_ASSIGN_OR_RAISE(
auto spec, table_->metadata()->PartitionSpecById(manifest.partition_spec_id));
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, file_io_, schema_, spec));
return reader->Entries();
}

std::shared_ptr<PartitionSpec> spec_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<DataFile> file_a_;
Expand Down Expand Up @@ -183,6 +211,63 @@ TEST_F(FastAppendTest, AppendManyFiles) {
EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size));
}

TEST_F(FastAppendTest, WriteManifestGroups) {
SetManifestTargetSizeBytes(std::numeric_limits<int64_t>::max());

test::ThreadExecutor executor;
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->WriteManifestsWith(executor, 3);

constexpr size_t kFileCount = 15'000;
constexpr size_t kGroupSize = 7'500;
std::vector<std::shared_ptr<DataFile>> files;
files.reserve(kFileCount);
for (size_t index = 0; index < kFileCount; ++index) {
auto data_file =
CreateDataFile(std::format("/data/group_{}.parquet", index),
/*record_count=*/1, /*size=*/1, static_cast<int64_t>(index % 2));
fast_append->AppendFile(data_file);
files.push_back(std::move(data_file));
}

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
EXPECT_EQ(executor.submit_count(), 2);
ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentDataManifests());
ASSERT_EQ(manifests.size(), 2U);

for (size_t group_index = 0; group_index < manifests.size(); ++group_index) {
ASSERT_TRUE(manifests[group_index].added_files_count.has_value());
EXPECT_EQ(manifests[group_index].added_files_count.value(), kGroupSize);

ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadEntries(manifests[group_index]));
ASSERT_EQ(entries.size(), kGroupSize);
const size_t offset = group_index * kGroupSize;
for (size_t entry_index = 0; entry_index < entries.size(); ++entry_index) {
ASSERT_NE(entries[entry_index].data_file, nullptr);
EXPECT_EQ(entries[entry_index].data_file->file_path,
files[offset + entry_index]->file_path);
}
}
}

TEST_F(FastAppendTest, InvalidManifestParallelism) {
test::ThreadExecutor executor;
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->WriteManifestsWith(executor, 0);
fast_append->AppendFile(file_a_);

auto result = fast_append->Commit();
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(
result,
HasErrorMessage("Manifest write parallelism must be greater than 0, but was: 0"));
EXPECT_EQ(executor.submit_count(), 0);
}

TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) {
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
const int64_t base_sequence_number = table_->metadata()->last_sequence_number;
Expand Down
45 changes: 45 additions & 0 deletions src/iceberg/test/merging_snapshot_update_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

#include "iceberg/update/merging_snapshot_update.h"

#include <limits>
#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <unordered_set>
#include <vector>
Expand All @@ -40,6 +43,7 @@
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/test/executor.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/transaction.h"
Expand Down Expand Up @@ -92,6 +96,16 @@ class TestMergeAppend : public MergingSnapshotUpdate {
Result<std::shared_ptr<PartitionSpec>> DataSpec() const {
return MergingSnapshotUpdate::DataSpec();
}
Result<std::vector<ManifestFile>> WriteDeletesForTest(
std::span<const std::shared_ptr<DataFile>> files,
const std::shared_ptr<PartitionSpec>& spec) {
auto entries = files | std::views::transform([](const auto& file) {
return ContentFileWithSequenceNumber{
.file = file, .data_sequence_number = std::nullopt};
}) |
std::ranges::to<std::vector>();
return WriteDeleteManifests(entries, spec);
}
int64_t GeneratedSnapshotId() { return SnapshotId(); }
void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); }
void SetCaseSensitive(bool case_sensitive) { CaseSensitive(case_sensitive); }
Expand Down Expand Up @@ -266,6 +280,14 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
table_->metadata()->format_version = format_version;
}

void SetManifestTargetSizeBytes(int64_t size_bytes) {
ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties());
props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()),
std::to_string(size_bytes));
EXPECT_THAT(props->Commit(), IsOk());
EXPECT_THAT(table_->Refresh(), IsOk());
}

// Commit file_a_ with FastAppend and refresh the table.
void CommitFileA() {
ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend());
Expand Down Expand Up @@ -639,6 +661,29 @@ TEST_F(MergingSnapshotUpdateTest, AddDeleteFileWithExplicitSequenceWritesSequenc
EXPECT_EQ(entries[0].sequence_number.value(), 17);
}

TEST_F(MergingSnapshotUpdateTest, WriteDeleteGroups) {
SetManifestTargetSizeBytes(std::numeric_limits<int64_t>::max());

test::ThreadExecutor executor;
ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
op->WriteManifestsWith(executor, 3);

constexpr size_t kFileCount = 15'000;
auto files = std::views::iota(0UZ, kFileCount) |
std::views::transform([this](size_t index) {
return MakeDeleteFile(std::format("/delete/group_{}.parquet", index),
static_cast<int64_t>(index % 2));
}) |
std::ranges::to<std::vector>();
ICEBERG_UNWRAP_OR_FAIL(auto manifests, op->WriteDeletesForTest(files, spec_));

EXPECT_EQ(executor.submit_count(), 2);
ASSERT_EQ(manifests.size(), 2U);
for (const auto& manifest : manifests) {
EXPECT_EQ(manifest.content, ManifestContent::kDeletes);
}
}

TEST_F(MergingSnapshotUpdateTest, ApplyRebuildsDeleteSummaryAfterPreparingDeletes) {
auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);

Expand Down
Loading
Loading