diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 8522475782e..4123a6b1735 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -102,9 +102,8 @@ class Message::MessageImpl { if (message_->custom_metadata() != nullptr) { // Deserialize from Flatbuffers if first time called - std::shared_ptr md; - RETURN_NOT_OK(internal::GetKeyValueMetadata(message_->custom_metadata(), &md)); - custom_metadata_ = std::move(md); // const-ify + ARROW_ASSIGN_OR_RAISE(custom_metadata_, + internal::GetKeyValueMetadata(message_->custom_metadata())); } return Status::OK(); diff --git a/cpp/src/arrow/ipc/message_internal_test.cc b/cpp/src/arrow/ipc/message_internal_test.cc index bc4506eaaed..c2566a39793 100644 --- a/cpp/src/arrow/ipc/message_internal_test.cc +++ b/cpp/src/arrow/ipc/message_internal_test.cc @@ -62,9 +62,8 @@ TEST(TestMessageInternal, TestByteIdentical) { auto schema = ::arrow::schema({f0}, Endianness::Little, metadata); // Serialize the Schema to a Buffer - std::shared_ptr out_buffer; - ASSERT_OK( - WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults(), &out_buffer)); + ASSERT_OK_AND_ASSIGN(auto out_buffer, + WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults())); // This is example output from macOS+ARM+LLVM const uint8_t expected[] = { @@ -104,9 +103,9 @@ TEST(TestMessageInternal, TestEndiannessRoundtrip) { auto schema = ::arrow::schema({f0}, endianness, metadata); // Serialize the Schema to a Buffer - std::shared_ptr out_buffer; - ASSERT_OK( - WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults(), &out_buffer)); + ASSERT_OK_AND_ASSIGN( + auto out_buffer, + WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults())); // Re-open to a new Message and parse Schema ASSERT_OK_AND_ASSIGN(auto message, Message::Open(out_buffer, /*body=*/nullptr)); @@ -183,9 +182,10 @@ class MessageDecodingTest : public ::testing::Test { auto buffer_md = std::vector{BufferMetadata{.offset = 64, .length = 1}, BufferMetadata{.offset = 72, .length = 10}}; std::shared_ptr out; - RETURN_NOT_OK(WriteRecordBatchMessage(/*length=*/kNumRows, params.body_length, - params.custom_metadata, field_md, buffer_md, - /*variadic_counts=*/{}, params.options, &out)); + ARROW_ASSIGN_OR_RAISE( + out, WriteRecordBatchMessage(/*length=*/kNumRows, params.body_length, + params.custom_metadata, field_md, buffer_md, + /*variadic_counts=*/{}, params.options)); ARROW_ASSIGN_OR_RAISE(out, EncapsulateMetadata(out, params.options)); // Generate a dummy body of the advertised length ARROW_ASSIGN_OR_RAISE(auto body_bytes, AllocateBuffer(params.body_length)); diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 7f2a47b0694..a1308411e2b 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -866,7 +866,8 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, FieldPosition field_pos, std::shared_ptr type; std::shared_ptr metadata; - RETURN_NOT_OK(internal::GetKeyValueMetadata(field->custom_metadata(), &metadata)); + ARROW_ASSIGN_OR_RAISE(metadata, + internal::GetKeyValueMetadata(field->custom_metadata())); // Reconstruct the data type // 1. Data type children @@ -1279,11 +1280,10 @@ Status MakeSparseTensor(FBB& fbb, const SparseTensor& sparse_tensor, int64_t bod } // namespace -Status GetKeyValueMetadata(const KVVector* fb_metadata, - std::shared_ptr* out) { +Result> GetKeyValueMetadata( + const KVVector* fb_metadata) { if (fb_metadata == nullptr) { - *out = nullptr; - return Status::OK(); + return nullptr; } auto metadata = std::make_shared(); @@ -1295,36 +1295,33 @@ Status GetKeyValueMetadata(const KVVector* fb_metadata, metadata->Append(pair->key()->str(), pair->value()->str()); } - *out = std::move(metadata); - return Status::OK(); + return metadata; } -Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& mapper, - const IpcWriteOptions& options, std::shared_ptr* out) { +Result> WriteSchemaMessage(const Schema& schema, + const DictionaryFieldMapper& mapper, + const IpcWriteOptions& options) { FBB fbb; flatbuffers::Offset fb_schema; RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, mapper, &fb_schema)); return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_Schema, fb_schema.Union(), /*body_length=*/0, options.metadata_version, - /*custom_metadata=*/nullptr, options.memory_pool) - .Value(out); + /*custom_metadata=*/nullptr, options.memory_pool); } -Status WriteRecordBatchMessage( +Result> WriteRecordBatchMessage( int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const std::vector& variadic_buffer_counts, const IpcWriteOptions& options, - std::shared_ptr* out) { + const std::vector& variadic_buffer_counts, const IpcWriteOptions& options) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, variadic_buffer_counts, options, &record_batch)); return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_RecordBatch, record_batch.Union(), body_length, options.metadata_version, - custom_metadata, options.memory_pool) - .Value(out); + custom_metadata, options.memory_pool); } Result> WriteTensorMessage(const Tensor& tensor, @@ -1373,12 +1370,11 @@ Result> WriteSparseTensorMessage( /*custom_metadata=*/nullptr, options.memory_pool); } -Status WriteDictionaryMessage( +Result> WriteDictionaryMessage( int64_t id, bool is_delta, int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const std::vector& variadic_buffer_counts, const IpcWriteOptions& options, - std::shared_ptr* out) { + const std::vector& variadic_buffer_counts, const IpcWriteOptions& options) { FBB fbb; RecordBatchOffset record_batch; RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, @@ -1387,8 +1383,7 @@ Status WriteDictionaryMessage( flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_DictionaryBatch, dictionary_batch, body_length, options.metadata_version, - custom_metadata, options.memory_pool) - .Value(out); + custom_metadata, options.memory_pool); } static flatbuffers::Offset> @@ -1462,7 +1457,8 @@ Status GetSchema(const void* opaque_schema, DictionaryMemo* dictionary_memo, } std::shared_ptr metadata; - RETURN_NOT_OK(internal::GetKeyValueMetadata(schema->custom_metadata(), &metadata)); + ARROW_ASSIGN_OR_RAISE(metadata, + internal::GetKeyValueMetadata(schema->custom_metadata())); // set endianness using the value in flatbuf schema auto endianness = schema->endianness() == flatbuf::Endianness::Endianness_Little ? Endianness::Little diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 2a9574d84a1..1997dfbcc45 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -162,8 +162,8 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr SparseTensorFormat::type* sparse_tensor_format_id); ARROW_EXPORT -Status GetKeyValueMetadata(const KVVector* fb_metadata, - std::shared_ptr* out); +Result> GetKeyValueMetadata( + const KVVector* fb_metadata); ARROW_EXPORT Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, const void* type_data, @@ -193,17 +193,17 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size, // Serialize arrow::Schema as a Flatbuffer ARROW_EXPORT -Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& mapper, - const IpcWriteOptions& options, std::shared_ptr* out); +Result> WriteSchemaMessage(const Schema& schema, + const DictionaryFieldMapper& mapper, + const IpcWriteOptions& options); // This function is used in a unit test ARROW_EXPORT -Status WriteRecordBatchMessage( +Result> WriteRecordBatchMessage( const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const std::vector& variadic_counts, const IpcWriteOptions& options, - std::shared_ptr* out); + const std::vector& variadic_counts, const IpcWriteOptions& options); ARROW_EXPORT Result> WriteTensorMessage(const Tensor& tensor, @@ -222,13 +222,12 @@ Status WriteFileFooter(const Schema& schema, const std::vector& dicti io::OutputStream* out); ARROW_EXPORT -Status WriteDictionaryMessage( +Result> WriteDictionaryMessage( const int64_t id, const bool is_delta, const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - const std::vector& variadic_counts, const IpcWriteOptions& options, - std::shared_ptr* out); + const std::vector& variadic_counts, const IpcWriteOptions& options); static inline Result> WriteFlatbufferBuilder( flatbuffers::FlatBufferBuilder& fbb, // NOLINT non-const reference diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 74e16dd8b1b..aa3f9fd77f2 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -156,11 +156,11 @@ TEST_P(TestMessage, SerializeCustomMetadata) { nullptr, key_value_metadata({}, {}), key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})}; for (auto metadata : cases) { - std::shared_ptr serialized; - ASSERT_OK(internal::WriteRecordBatchMessage( - /*length=*/0, /*body_length=*/0, metadata, - /*nodes=*/{}, - /*buffers=*/{}, /*variadic_counts=*/{}, options_, &serialized)); + ASSERT_OK_AND_ASSIGN(auto serialized, + internal::WriteRecordBatchMessage( + /*length=*/0, /*body_length=*/0, metadata, + /*nodes=*/{}, + /*buffers=*/{}, /*variadic_counts=*/{}, options_)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, Message::Open(serialized, /*body=*/nullptr)); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 76d286dbdcc..f8bbf6c13d3 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -765,7 +765,8 @@ Status GetCompressionExperimental(const flatbuf::Message* message, if (message->custom_metadata() != nullptr) { // TODO: Ensure this deserialization only ever happens once std::shared_ptr metadata; - RETURN_NOT_OK(internal::GetKeyValueMetadata(message->custom_metadata(), &metadata)); + ARROW_ASSIGN_OR_RAISE(metadata, + internal::GetKeyValueMetadata(message->custom_metadata())); int index = metadata->FindKey("ARROW:experimental_compression"); if (index != -1) { // Arrow 0.17 stored string in upper case, internal utils now require lower case @@ -810,8 +811,8 @@ Result ReadRecordBatchInternal( std::shared_ptr custom_metadata; if (message->custom_metadata() != nullptr) { - RETURN_NOT_OK( - internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata)); + ARROW_ASSIGN_OR_RAISE(custom_metadata, + internal::GetKeyValueMetadata(message->custom_metadata())); } ARROW_ASSIGN_OR_RAISE(auto record_batch, LoadRecordBatch(batch, schema, inclusion_mask, context, file)); @@ -1426,8 +1427,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj)); std::shared_ptr custom_metadata; if (message->custom_metadata() != nullptr) { - RETURN_NOT_OK( - internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata)); + ARROW_ASSIGN_OR_RAISE(custom_metadata, + internal::GetKeyValueMetadata(message->custom_metadata())); } return RecordBatchWithMetadata{std::move(batch), std::move(custom_metadata)}; } @@ -1928,8 +1929,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { auto fb_metadata = self->footer_->custom_metadata(); if (fb_metadata != nullptr) { std::shared_ptr md; - RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md)); - self->metadata_ = std::move(md); // const-ify + ARROW_ASSIGN_OR_RAISE(self->metadata_, + internal::GetKeyValueMetadata(fb_metadata)); } return Status::OK(); }); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 09a9aef8975..749eab85d8a 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -180,9 +180,11 @@ class RecordBatchSerializer { // Override this for writing dictionary metadata virtual Status SerializeMetadata(int64_t num_rows) { - return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_, - field_nodes_, buffer_meta_, variadic_counts_, options_, - &out_->metadata); + ARROW_ASSIGN_OR_RAISE( + out_->metadata, + WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_, + field_nodes_, buffer_meta_, variadic_counts_, options_)); + return Status::OK(); } bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size) const { @@ -746,9 +748,12 @@ class DictionarySerializer : public RecordBatchSerializer { is_delta_(is_delta) {} Status SerializeMetadata(int64_t num_rows) override { - return WriteDictionaryMessage(dictionary_id_, is_delta_, num_rows, out_->body_length, - custom_metadata_, field_nodes_, buffer_meta_, - variadic_counts_, options_, &out_->metadata); + ARROW_ASSIGN_OR_RAISE( + out_->metadata, + WriteDictionaryMessage(dictionary_id_, is_delta_, num_rows, out_->body_length, + custom_metadata_, field_nodes_, buffer_meta_, + variadic_counts_, options_)); + return Status::OK(); } Status Assemble(const std::shared_ptr& dictionary) { @@ -804,7 +809,9 @@ Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options, const DictionaryFieldMapper& mapper, IpcPayload* out) { out->type = MessageType::SCHEMA; - return internal::WriteSchemaMessage(schema, mapper, options, &out->metadata); + ARROW_ASSIGN_OR_RAISE(out->metadata, + internal::WriteSchemaMessage(schema, mapper, options)); + return Status::OK(); } Status GetDictionaryPayload(int64_t id, const std::shared_ptr& dictionary,