From 6ee3bbb3d4588da6db25c29c87c165a9efd1cc24 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Tue, 23 Jun 2026 15:46:17 +0800 Subject: [PATCH] feat: parallelize writing manifests --- src/iceberg/avro/avro_writer.cc | 26 +++-- src/iceberg/manifest/manifest_adapter.cc | 2 +- src/iceberg/test/executor_util_test.cc | 53 ++++++++-- src/iceberg/test/fast_append_test.cc | 85 ++++++++++++++++ .../test/merging_snapshot_update_test.cc | 45 +++++++++ src/iceberg/update/snapshot_update.cc | 97 +++++++++++++------ src/iceberg/update/snapshot_update.h | 21 ++++ src/iceberg/util/executor_util_internal.h | 41 +++++--- 8 files changed, 304 insertions(+), 66 deletions(-) diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 63fc31462..01d8cb3b1 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -21,6 +21,7 @@ #include +#include #include #include #include @@ -178,12 +179,6 @@ class GenericDatumBackend : public AvroWriteBackend { class AvroWriter::Impl { public: - ~Impl() { - if (arrow_schema_.release != nullptr) { - ArrowSchemaRelease(&arrow_schema_); - } - } - Status Open(const WriterOptions& options) { write_schema_ = options.schema; @@ -227,19 +222,22 @@ class AvroWriter::Impl { options.properties.Get(WriterProperties::kAvroSyncInterval), codec, compression_level, metadata)); - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); + ArrowSchema c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); return {}; } Status Write(ArrowArray* data) { - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, - ::arrow::ImportArray(data, &arrow_schema_)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, + ::arrow::ImportRecordBatch(data, arrow_schema_)); - for (int64_t i = 0; i < result->length(); i++) { - ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto struct_array, batch->ToStructArray()); + for (int64_t i = 0; i < struct_array->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *struct_array, i)); } - num_records_ += result->length(); + num_records_ += struct_array->length(); return {}; } @@ -278,8 +276,8 @@ class AvroWriter::Impl { std::shared_ptr<::avro::ValidSchema> avro_schema_; // Arrow output stream of the Avro file to write std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; - // Arrow schema to write data. - ArrowSchema arrow_schema_; + // Arrow schema to import C data batches. + std::shared_ptr<::arrow::Schema> arrow_schema_; // Total length of the written Avro file. int64_t total_bytes_ = 0; // Number of records written. diff --git a/src/iceberg/manifest/manifest_adapter.cc b/src/iceberg/manifest/manifest_adapter.cc index b37d82702..526d31271 100644 --- a/src/iceberg/manifest/manifest_adapter.cc +++ b/src/iceberg/manifest/manifest_adapter.cc @@ -126,7 +126,6 @@ Status ManifestAdapter::StartAppending() { return InvalidArgument("Adapter buffer not empty, cannot start appending."); } array_ = {}; - size_ = 0; ArrowError error; ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( ArrowArrayInitFromSchema(&array_, &schema_, &error), error); @@ -138,6 +137,7 @@ Result ManifestAdapter::FinishAppending() { ArrowError error; ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR( ArrowArrayFinishBuildingDefault(&array_, &error), error); + size_ = 0; return &array_; } diff --git a/src/iceberg/test/executor_util_test.cc b/src/iceberg/test/executor_util_test.cc index 3bb9ad9fc..3a0ddbff6 100644 --- a/src/iceberg/test/executor_util_test.cc +++ b/src/iceberg/test/executor_util_test.cc @@ -20,8 +20,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -51,19 +53,15 @@ struct IntTask { } }; -struct BoolTask { - Result> operator()(bool) { - return Result>{std::vector{}}; - } -}; - static_assert(internal::ParallelCollectible&, IntTask>); static_assert( std::same_as&>(), IntTask{})), Result>>); -static_assert(!internal::ParallelCollectible); -static_assert(!internal::ParallelCollectible&, BoolTask>); +static_assert(internal::ParallelCollectible); +static_assert(std::same_as>>); } // namespace @@ -139,6 +137,45 @@ TEST(ParallelCollectTest, CollectsSingleRange) { EXPECT_THAT(*result, UnorderedElementsAre(2, 4, 6)); } +TEST(ParallelCollectTest, CollectsIotaView) { + auto input = std::views::iota(1, 4); + + auto result = ParallelCollect(std::nullopt, input, [](int value) { + return Result>{{value * 2}}; + }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(*result, UnorderedElementsAre(2, 4, 6)); +} + +TEST(ParallelCollectTest, CollectsTransformView) { + std::vector values = {1, 2, 3, 4}; + std::span files(values); + auto input = std::views::iota(0, 2) | std::views::transform([files](int index) { + return files.subspan(index * 2, 2); + }); + + auto result = ParallelCollect(std::nullopt, input, [](std::span group) { + return Result>{{group.front(), group.back()}}; + }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(*result, ElementsAre(1, 2, 3, 4)); +} + +TEST(ParallelCollectTest, CollectsMoveOnlyPrvalues) { + auto input = std::views::iota(1, 4) | std::views::transform([](int value) { + return std::make_unique(value); + }); + + auto result = ParallelCollect(std::nullopt, input, [](std::unique_ptr value) { + return Result>{{*value}}; + }); + + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(*result, ElementsAre(1, 2, 3)); +} + TEST(ParallelCollectTest, KeepsTupleResultFromSingleRange) { std::vector input = {1, 2}; diff --git a/src/iceberg/test/fast_append_test.cc b/src/iceberg/test/fast_append_test.cc index 8853d419c..d9e9a7eb5 100644 --- a/src/iceberg/test/fast_append_test.cc +++ b/src/iceberg/test/fast_append_test.cc @@ -20,6 +20,7 @@ #include "iceberg/update/fast_append.h" #include +#include #include #include #include @@ -32,14 +33,18 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/constants.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_reader.h" #include "iceberg/manifest/manifest_writer.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transaction.h" +#include "iceberg/update/update_properties.h" // IWYU pragma: keep namespace iceberg { @@ -117,6 +122,29 @@ class FastAppendTest : public UpdateTestBase { return writer->ToManifestFile(); } + void SetManifestTargetSizeBytes(int64_t size_bytes) { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()), + std::to_string(size_bytes)); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + Result> CurrentDataManifests() { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table_->current_snapshot()); + SnapshotCache snapshot_cache(snapshot.get()); + ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(file_io_)); + return std::vector(manifests.begin(), manifests.end()); + } + + Result> ReadEntries(const ManifestFile& manifest) { + ICEBERG_ASSIGN_OR_RAISE( + auto spec, table_->metadata()->PartitionSpecById(manifest.partition_spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, file_io_, schema_, spec)); + return reader->Entries(); + } + std::shared_ptr spec_; std::shared_ptr schema_; std::shared_ptr file_a_; @@ -183,6 +211,63 @@ TEST_F(FastAppendTest, AppendManyFiles) { EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size)); } +TEST_F(FastAppendTest, WriteManifestGroups) { + SetManifestTargetSizeBytes(std::numeric_limits::max()); + + test::ThreadExecutor executor; + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->WriteManifestsWith(executor, 3); + + constexpr size_t kFileCount = 15'000; + constexpr size_t kGroupSize = 7'500; + std::vector> files; + files.reserve(kFileCount); + for (size_t index = 0; index < kFileCount; ++index) { + auto data_file = + CreateDataFile(std::format("/data/group_{}.parquet", index), + /*record_count=*/1, /*size=*/1, static_cast(index % 2)); + fast_append->AppendFile(data_file); + files.push_back(std::move(data_file)); + } + + EXPECT_THAT(fast_append->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + EXPECT_EQ(executor.submit_count(), 2); + ICEBERG_UNWRAP_OR_FAIL(auto manifests, CurrentDataManifests()); + ASSERT_EQ(manifests.size(), 2U); + + for (size_t group_index = 0; group_index < manifests.size(); ++group_index) { + ASSERT_TRUE(manifests[group_index].added_files_count.has_value()); + EXPECT_EQ(manifests[group_index].added_files_count.value(), kGroupSize); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadEntries(manifests[group_index])); + ASSERT_EQ(entries.size(), kGroupSize); + const size_t offset = group_index * kGroupSize; + for (size_t entry_index = 0; entry_index < entries.size(); ++entry_index) { + ASSERT_NE(entries[entry_index].data_file, nullptr); + EXPECT_EQ(entries[entry_index].data_file->file_path, + files[offset + entry_index]->file_path); + } + } +} + +TEST_F(FastAppendTest, InvalidManifestParallelism) { + test::ThreadExecutor executor; + std::shared_ptr fast_append; + ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend()); + fast_append->WriteManifestsWith(executor, 0); + fast_append->AppendFile(file_a_); + + auto result = fast_append->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT( + result, + HasErrorMessage("Manifest write parallelism must be greater than 0, but was: 0")); + EXPECT_EQ(executor.submit_count(), 0); +} + TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) { EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot")); const int64_t base_sequence_number = table_->metadata()->last_sequence_number; diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index 1b50a124e..9841d74a9 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -19,7 +19,10 @@ #include "iceberg/update/merging_snapshot_update.h" +#include #include +#include +#include #include #include #include @@ -40,6 +43,7 @@ #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" +#include "iceberg/test/executor.h" #include "iceberg/test/matchers.h" #include "iceberg/test/update_test_base.h" #include "iceberg/transaction.h" @@ -92,6 +96,16 @@ class TestMergeAppend : public MergingSnapshotUpdate { Result> DataSpec() const { return MergingSnapshotUpdate::DataSpec(); } + Result> WriteDeletesForTest( + std::span> files, + const std::shared_ptr& spec) { + auto entries = files | std::views::transform([](const auto& file) { + return ContentFileWithSequenceNumber{ + .file = file, .data_sequence_number = std::nullopt}; + }) | + std::ranges::to(); + return WriteDeleteManifests(entries, spec); + } int64_t GeneratedSnapshotId() { return SnapshotId(); } void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); } void SetCaseSensitive(bool case_sensitive) { CaseSensitive(case_sensitive); } @@ -266,6 +280,14 @@ class MergingSnapshotUpdateTest : public MinimalUpdateTestBase { table_->metadata()->format_version = format_version; } + void SetManifestTargetSizeBytes(int64_t size_bytes) { + ICEBERG_UNWRAP_OR_FAIL(auto props, table_->NewUpdateProperties()); + props->Set(std::string(TableProperties::kManifestTargetSizeBytes.key()), + std::to_string(size_bytes)); + EXPECT_THAT(props->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + // Commit file_a_ with FastAppend and refresh the table. void CommitFileA() { ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend()); @@ -639,6 +661,29 @@ TEST_F(MergingSnapshotUpdateTest, AddDeleteFileWithExplicitSequenceWritesSequenc EXPECT_EQ(entries[0].sequence_number.value(), 17); } +TEST_F(MergingSnapshotUpdateTest, WriteDeleteGroups) { + SetManifestTargetSizeBytes(std::numeric_limits::max()); + + test::ThreadExecutor executor; + ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); + op->WriteManifestsWith(executor, 3); + + constexpr size_t kFileCount = 15'000; + auto files = std::views::iota(0UZ, kFileCount) | + std::views::transform([this](size_t index) { + return MakeDeleteFile(std::format("/delete/group_{}.parquet", index), + static_cast(index % 2)); + }) | + std::ranges::to(); + ICEBERG_UNWRAP_OR_FAIL(auto manifests, op->WriteDeletesForTest(files, spec_)); + + EXPECT_EQ(executor.submit_count(), 2); + ASSERT_EQ(manifests.size(), 2U); + for (const auto& manifest : manifests) { + EXPECT_EQ(manifest.content, ManifestContent::kDeletes); + } +} + TEST_F(MergingSnapshotUpdateTest, ApplyRebuildsDeleteSummaryAfterPreparingDeletes) { auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 40669a35f..1d6916da6 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -19,8 +19,10 @@ #include "iceberg/update/snapshot_update.h" +#include #include #include +#include #include "iceberg/constants.h" #include "iceberg/file_io.h" @@ -32,6 +34,7 @@ #include "iceberg/partition_summary_internal.h" #include "iceberg/table.h" // IWYU pragma: keep #include "iceberg/transaction.h" +#include "iceberg/util/executor_util_internal.h" #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/string_util.h" @@ -79,6 +82,29 @@ Status UpdateTotal(std::unordered_map& summary, return {}; } +constexpr size_t kMinManifestWriterGroupSize = 10'000; + +template +Result> WriteManifestGroups(OptionalExecutor executor, + int32_t max_parallelism, + std::span files, + WriteGroup&& write_group) { + const auto limit = static_cast( + (files.size() + kMinManifestWriterGroupSize / 2) / kMinManifestWriterGroupSize); + const auto group_count = + static_cast(std::max(1, std::min(max_parallelism, limit))); + const size_t group_size = (files.size() + group_count - 1) / group_count; + // TODO(zehua): Replace the manual offset calculation with `std::views::chunk` + // once the supported libc++ provides it. + auto groups = + std::views::iota(0UZ, group_count) | + std::views::transform([files, group_size](size_t group_index) { + const size_t offset = group_index * group_size; + return files.subspan(offset, std::min(group_size, files.size() - offset)); + }); + return ParallelCollect(executor, groups, std::forward(write_group)); +} + // Add metadata to a manifest file by reading it and extracting statistics. Result AddMetadata(const ManifestFile& manifest, std::shared_ptr io, const TableMetadata& metadata) { @@ -175,7 +201,6 @@ void SnapshotUpdate::SetSummaryProperty(const std::string& property, summary_.Set(property, value); } -// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDataManifests( std::span> files, const std::shared_ptr& spec, @@ -185,23 +210,28 @@ Result> SnapshotUpdate::WriteDataManifests( } ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter( - base().format_version, snapshot_id, ManifestPath(), ctx_->table->io(), - std::move(spec), std::move(schema), ManifestContent::kData); - }, - target_manifest_size_bytes_); - - for (const auto& file : files) { - ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); + const int8_t format_version = base().format_version; + const int64_t snapshot_id = SnapshotId(); + auto make_writer = [&]() { + return ManifestWriter::MakeWriter(format_version, snapshot_id, ManifestPath(), + ctx_->table->io(), spec, current_schema, + ManifestContent::kData); + }; + + return WriteManifestGroups( + write_manifest_executor_, write_manifest_parallelism_, files, + [&](std::span> group) + -> Result> { + RollingManifestWriter rolling_writer(make_writer, target_manifest_size_bytes_); + for (const auto& file : group) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(file, data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); + }); } -// TODO(xxx): Split files into independent rolling-writer groups before parallelizing. Result> SnapshotUpdate::WriteDeleteManifests( std::span files, const std::shared_ptr& spec) { @@ -210,21 +240,26 @@ Result> SnapshotUpdate::WriteDeleteManifests( } ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema()); - RollingManifestWriter rolling_writer( - [this, spec, schema = std::move(current_schema), - snapshot_id = SnapshotId()]() -> Result> { - return ManifestWriter::MakeWriter( - base().format_version, snapshot_id, ManifestPath(), ctx_->table->io(), - std::move(spec), std::move(schema), ManifestContent::kDeletes); - }, - target_manifest_size_bytes_); - - for (const auto& entry : files) { - ICEBERG_RETURN_UNEXPECTED( - rolling_writer.WriteAddedEntry(entry.file, entry.data_sequence_number)); - } - ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); - return rolling_writer.ToManifestFiles(); + const int8_t format_version = base().format_version; + const int64_t snapshot_id = SnapshotId(); + auto make_writer = [&]() { + return ManifestWriter::MakeWriter(format_version, snapshot_id, ManifestPath(), + ctx_->table->io(), spec, current_schema, + ManifestContent::kDeletes); + }; + + return WriteManifestGroups( + write_manifest_executor_, write_manifest_parallelism_, files, + [&](std::span group) + -> Result> { + RollingManifestWriter rolling_writer(make_writer, target_manifest_size_bytes_); + for (const auto& entry : group) { + ICEBERG_RETURN_UNEXPECTED( + rolling_writer.WriteAddedEntry(entry.file, entry.data_sequence_number)); + } + ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close()); + return rolling_writer.ToManifestFiles(); + }); } int64_t SnapshotUpdate::SnapshotId() { diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index fa8dcbf12..d92c445ca 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -124,6 +124,25 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Configure an executor and max writer count for writing new manifests. + /// + /// If this method is not called, manifest writes remain serial. When configured, + /// files may be split into independent rolling-writer groups. + /// + /// \note Custom FileIO implementations and registered writer factories used for + /// manifest writes must support concurrent calls when an executor is configured. + auto& WriteManifestsWith(this auto& self, Executor& executor, int32_t parallelism) { + if (parallelism <= 0) [[unlikely]] { + return self.AddError( + ErrorKind::kInvalidArgument, + "Manifest write parallelism must be greater than 0, but was: {}", parallelism); + } + + self.write_manifest_executor_ = std::ref(executor); + self.write_manifest_parallelism_ = parallelism; + return self; + } + /// \brief Apply the update's changes to create a new snapshot. /// /// This method validates the changes, applies them to the current base @@ -251,6 +270,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { private: const bool can_inherit_snapshot_id_{true}; const std::string commit_uuid_; + OptionalExecutor write_manifest_executor_; + int32_t write_manifest_parallelism_{1}; std::atomic manifest_count_{0}; std::atomic attempt_{0}; std::vector manifest_lists_; diff --git a/src/iceberg/util/executor_util_internal.h b/src/iceberg/util/executor_util_internal.h index 5eed3a8bd..9e8ac3782 100644 --- a/src/iceberg/util/executor_util_internal.h +++ b/src/iceberg/util/executor_util_internal.h @@ -42,6 +42,10 @@ struct ParallelReduce; namespace internal { +template +using ParallelCollectArgT = + std::conditional_t, Ref, std::remove_cvref_t&&>; + template concept ParallelReducible = requires(std::vector& values) { typename ParallelReduce::result_type; @@ -51,10 +55,9 @@ concept ParallelReducible = requires(std::vector& values) { }; template -using ParallelCollectValueT = - ResultValueT&, - std::add_lvalue_reference_t>>>>; +using ParallelCollectValueT = ResultValueT&, + ParallelCollectArgT>>>; template struct ParallelCollectTraits { @@ -67,10 +70,13 @@ struct ParallelCollectTraits { template concept ParallelCollectible = std::ranges::forward_range && std::ranges::sized_range && - std::is_lvalue_reference_v> && + (std::is_lvalue_reference_v> || + std::constructible_from< + std::remove_cvref_t>, + std::ranges::range_reference_t>) && requires(std::remove_reference_t& task, - std::ranges::range_reference_t item) { - { std::invoke(task, item) } -> AsResult; + ParallelCollectArgT> item) { + { std::invoke(task, std::forward(item)) } -> AsResult; requires(!std::same_as>); requires std::default_initializable>; requires ParallelReducible, Options...>; @@ -198,13 +204,24 @@ auto ParallelCollect(OptionalExecutor executor, Args&&... args) { [&](std::index_sequence) { ( [&] { + using item_ref = std::ranges::range_reference_t< + typename internal::ParallelCollectTraits::input_type>; + for (auto&& [item, value] : std::views::zip(std::get(args_tuple), std::get(values_tuple))) { - group.Submit([&]() -> Status { - ICEBERG_ASSIGN_OR_RAISE(value, - std::invoke(std::get(args_tuple), item)); - return {}; - }); + if constexpr (std::is_lvalue_reference_v) { + group.Submit([&]() -> Status { + ICEBERG_ASSIGN_OR_RAISE( + value, std::invoke(std::get(args_tuple), item)); + return {}; + }); + } else { + group.Submit([&, item = std::move(item)]() mutable -> Status { + ICEBERG_ASSIGN_OR_RAISE( + value, std::invoke(std::get(args_tuple), std::move(item))); + return {}; + }); + } } }(), ...);