Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ class Message::MessageImpl {

if (message_->custom_metadata() != nullptr) {
// Deserialize from Flatbuffers if first time called
std::shared_ptr<KeyValueMetadata> md;
RETURN_NOT_OK(internal::GetKeyValueMetadata(message_->custom_metadata(), &md));
custom_metadata_ = std::move(md); // const-ify
ARROW_ASSIGN_OR_RAISE(custom_metadata_,
internal::GetKeyValueMetadata(message_->custom_metadata()));
}

return Status::OK();
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/arrow/ipc/message_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ TEST(TestMessageInternal, TestByteIdentical) {
auto schema = ::arrow::schema({f0}, Endianness::Little, metadata);

// Serialize the Schema to a Buffer
std::shared_ptr<Buffer> out_buffer;
ASSERT_OK(
WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults(), &out_buffer));
ASSERT_OK_AND_ASSIGN(auto out_buffer,
WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults()));

// This is example output from macOS+ARM+LLVM
const uint8_t expected[] = {
Expand Down Expand Up @@ -104,9 +103,9 @@ TEST(TestMessageInternal, TestEndiannessRoundtrip) {
auto schema = ::arrow::schema({f0}, endianness, metadata);

// Serialize the Schema to a Buffer
std::shared_ptr<Buffer> out_buffer;
ASSERT_OK(
WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults(), &out_buffer));
ASSERT_OK_AND_ASSIGN(
auto out_buffer,
WriteSchemaMessage(*schema, mapper, IpcWriteOptions::Defaults()));

// Re-open to a new Message and parse Schema
ASSERT_OK_AND_ASSIGN(auto message, Message::Open(out_buffer, /*body=*/nullptr));
Expand Down Expand Up @@ -183,9 +182,10 @@ class MessageDecodingTest : public ::testing::Test {
auto buffer_md = std::vector{BufferMetadata{.offset = 64, .length = 1},
BufferMetadata{.offset = 72, .length = 10}};
std::shared_ptr<Buffer> out;
RETURN_NOT_OK(WriteRecordBatchMessage(/*length=*/kNumRows, params.body_length,
params.custom_metadata, field_md, buffer_md,
/*variadic_counts=*/{}, params.options, &out));
ARROW_ASSIGN_OR_RAISE(
out, WriteRecordBatchMessage(/*length=*/kNumRows, params.body_length,
params.custom_metadata, field_md, buffer_md,
/*variadic_counts=*/{}, params.options));
ARROW_ASSIGN_OR_RAISE(out, EncapsulateMetadata(out, params.options));
// Generate a dummy body of the advertised length
ARROW_ASSIGN_OR_RAISE(auto body_bytes, AllocateBuffer(params.body_length));
Expand Down
40 changes: 18 additions & 22 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, FieldPosition field_pos,
std::shared_ptr<DataType> type;

std::shared_ptr<KeyValueMetadata> metadata;
RETURN_NOT_OK(internal::GetKeyValueMetadata(field->custom_metadata(), &metadata));
ARROW_ASSIGN_OR_RAISE(metadata,
internal::GetKeyValueMetadata(field->custom_metadata()));

// Reconstruct the data type
// 1. Data type children
Expand Down Expand Up @@ -1279,11 +1280,10 @@ Status MakeSparseTensor(FBB& fbb, const SparseTensor& sparse_tensor, int64_t bod

} // namespace

Status GetKeyValueMetadata(const KVVector* fb_metadata,
std::shared_ptr<KeyValueMetadata>* out) {
Result<std::shared_ptr<KeyValueMetadata>> GetKeyValueMetadata(
const KVVector* fb_metadata) {
if (fb_metadata == nullptr) {
*out = nullptr;
return Status::OK();
return nullptr;
}

auto metadata = std::make_shared<KeyValueMetadata>();
Expand All @@ -1295,36 +1295,33 @@ Status GetKeyValueMetadata(const KVVector* fb_metadata,
metadata->Append(pair->key()->str(), pair->value()->str());
}

*out = std::move(metadata);
return Status::OK();
return metadata;
}

Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& mapper,
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
Result<std::shared_ptr<Buffer>> WriteSchemaMessage(const Schema& schema,
const DictionaryFieldMapper& mapper,
const IpcWriteOptions& options) {
FBB fbb;
flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, mapper, &fb_schema));
return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_Schema,
fb_schema.Union(),
/*body_length=*/0, options.metadata_version,
/*custom_metadata=*/nullptr, options.memory_pool)
.Value(out);
/*custom_metadata=*/nullptr, options.memory_pool);
}

Status WriteRecordBatchMessage(
Result<std::shared_ptr<Buffer>> WriteRecordBatchMessage(
int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
const std::vector<int64_t>& variadic_buffer_counts, const IpcWriteOptions& options,
std::shared_ptr<Buffer>* out) {
const std::vector<int64_t>& variadic_buffer_counts, const IpcWriteOptions& options) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers,
variadic_buffer_counts, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_RecordBatch,
record_batch.Union(), body_length, options.metadata_version,
custom_metadata, options.memory_pool)
.Value(out);
custom_metadata, options.memory_pool);
}

Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
Expand Down Expand Up @@ -1373,12 +1370,11 @@ Result<std::shared_ptr<Buffer>> WriteSparseTensorMessage(
/*custom_metadata=*/nullptr, options.memory_pool);
}

Status WriteDictionaryMessage(
Result<std::shared_ptr<Buffer>> WriteDictionaryMessage(
int64_t id, bool is_delta, int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
const std::vector<int64_t>& variadic_buffer_counts, const IpcWriteOptions& options,
std::shared_ptr<Buffer>* out) {
const std::vector<int64_t>& variadic_buffer_counts, const IpcWriteOptions& options) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers,
Expand All @@ -1387,8 +1383,7 @@ Status WriteDictionaryMessage(
flatbuf::CreateDictionaryBatch(fbb, id, record_batch, is_delta).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::MessageHeader_DictionaryBatch,
dictionary_batch, body_length, options.metadata_version,
custom_metadata, options.memory_pool)
.Value(out);
custom_metadata, options.memory_pool);
}

static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
Expand Down Expand Up @@ -1462,7 +1457,8 @@ Status GetSchema(const void* opaque_schema, DictionaryMemo* dictionary_memo,
}

std::shared_ptr<KeyValueMetadata> metadata;
RETURN_NOT_OK(internal::GetKeyValueMetadata(schema->custom_metadata(), &metadata));
ARROW_ASSIGN_OR_RAISE(metadata,
internal::GetKeyValueMetadata(schema->custom_metadata()));
// set endianness using the value in flatbuf schema
auto endianness = schema->endianness() == flatbuf::Endianness::Endianness_Little
? Endianness::Little
Expand Down
19 changes: 9 additions & 10 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
SparseTensorFormat::type* sparse_tensor_format_id);

ARROW_EXPORT
Status GetKeyValueMetadata(const KVVector* fb_metadata,
std::shared_ptr<KeyValueMetadata>* out);
Result<std::shared_ptr<KeyValueMetadata>> GetKeyValueMetadata(
const KVVector* fb_metadata);

ARROW_EXPORT
Status ConcreteTypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
Expand Down Expand Up @@ -193,17 +193,17 @@ static inline Status VerifyMessage(const uint8_t* data, int64_t size,

// Serialize arrow::Schema as a Flatbuffer
ARROW_EXPORT
Status WriteSchemaMessage(const Schema& schema, const DictionaryFieldMapper& mapper,
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);
Result<std::shared_ptr<Buffer>> WriteSchemaMessage(const Schema& schema,
const DictionaryFieldMapper& mapper,
const IpcWriteOptions& options);

// This function is used in a unit test
ARROW_EXPORT
Status WriteRecordBatchMessage(
Result<std::shared_ptr<Buffer>> WriteRecordBatchMessage(
const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
const std::vector<int64_t>& variadic_counts, const IpcWriteOptions& options,
std::shared_ptr<Buffer>* out);
const std::vector<int64_t>& variadic_counts, const IpcWriteOptions& options);

ARROW_EXPORT
Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
Expand All @@ -222,13 +222,12 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti
io::OutputStream* out);

ARROW_EXPORT
Status WriteDictionaryMessage(
Result<std::shared_ptr<Buffer>> WriteDictionaryMessage(
const int64_t id, const bool is_delta, const int64_t length,
const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
const std::vector<int64_t>& variadic_counts, const IpcWriteOptions& options,
std::shared_ptr<Buffer>* out);
const std::vector<int64_t>& variadic_counts, const IpcWriteOptions& options);

static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
flatbuffers::FlatBufferBuilder& fbb, // NOLINT non-const reference
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ TEST_P(TestMessage, SerializeCustomMetadata) {
nullptr, key_value_metadata({}, {}),
key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})};
for (auto metadata : cases) {
std::shared_ptr<Buffer> serialized;
ASSERT_OK(internal::WriteRecordBatchMessage(
/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, /*variadic_counts=*/{}, options_, &serialized));
ASSERT_OK_AND_ASSIGN(auto serialized,
internal::WriteRecordBatchMessage(
/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, /*variadic_counts=*/{}, options_));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
Message::Open(serialized, /*body=*/nullptr));

Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,8 @@ Status GetCompressionExperimental(const flatbuf::Message* message,
if (message->custom_metadata() != nullptr) {
// TODO: Ensure this deserialization only ever happens once
std::shared_ptr<KeyValueMetadata> metadata;
RETURN_NOT_OK(internal::GetKeyValueMetadata(message->custom_metadata(), &metadata));
ARROW_ASSIGN_OR_RAISE(metadata,
internal::GetKeyValueMetadata(message->custom_metadata()));
int index = metadata->FindKey("ARROW:experimental_compression");
if (index != -1) {
// Arrow 0.17 stored string in upper case, internal utils now require lower case
Expand Down Expand Up @@ -810,8 +811,8 @@ Result<RecordBatchWithMetadata> ReadRecordBatchInternal(

std::shared_ptr<KeyValueMetadata> custom_metadata;
if (message->custom_metadata() != nullptr) {
RETURN_NOT_OK(
internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata));
ARROW_ASSIGN_OR_RAISE(custom_metadata,
internal::GetKeyValueMetadata(message->custom_metadata()));
}
ARROW_ASSIGN_OR_RAISE(auto record_batch,
LoadRecordBatch(batch, schema, inclusion_mask, context, file));
Expand Down Expand Up @@ -1426,8 +1427,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
ARROW_ASSIGN_OR_RAISE(auto message, GetFlatbufMessage(message_obj));
std::shared_ptr<KeyValueMetadata> custom_metadata;
if (message->custom_metadata() != nullptr) {
RETURN_NOT_OK(
internal::GetKeyValueMetadata(message->custom_metadata(), &custom_metadata));
ARROW_ASSIGN_OR_RAISE(custom_metadata,
internal::GetKeyValueMetadata(message->custom_metadata()));
}
return RecordBatchWithMetadata{std::move(batch), std::move(custom_metadata)};
}
Expand Down Expand Up @@ -1928,8 +1929,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
auto fb_metadata = self->footer_->custom_metadata();
if (fb_metadata != nullptr) {
std::shared_ptr<KeyValueMetadata> md;
RETURN_NOT_OK(internal::GetKeyValueMetadata(fb_metadata, &md));
self->metadata_ = std::move(md); // const-ify
ARROW_ASSIGN_OR_RAISE(self->metadata_,
internal::GetKeyValueMetadata(fb_metadata));
}
return Status::OK();
});
Expand Down
21 changes: 14 additions & 7 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ class RecordBatchSerializer {

// Override this for writing dictionary metadata
virtual Status SerializeMetadata(int64_t num_rows) {
return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
field_nodes_, buffer_meta_, variadic_counts_, options_,
&out_->metadata);
ARROW_ASSIGN_OR_RAISE(
out_->metadata,
WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
field_nodes_, buffer_meta_, variadic_counts_, options_));
return Status::OK();
}

bool ShouldCompress(int64_t uncompressed_size, int64_t compressed_size) const {
Expand Down Expand Up @@ -746,9 +748,12 @@ class DictionarySerializer : public RecordBatchSerializer {
is_delta_(is_delta) {}

Status SerializeMetadata(int64_t num_rows) override {
return WriteDictionaryMessage(dictionary_id_, is_delta_, num_rows, out_->body_length,
custom_metadata_, field_nodes_, buffer_meta_,
variadic_counts_, options_, &out_->metadata);
ARROW_ASSIGN_OR_RAISE(
out_->metadata,
WriteDictionaryMessage(dictionary_id_, is_delta_, num_rows, out_->body_length,
custom_metadata_, field_nodes_, buffer_meta_,
variadic_counts_, options_));
return Status::OK();
}

Status Assemble(const std::shared_ptr<Array>& dictionary) {
Expand Down Expand Up @@ -804,7 +809,9 @@ Status WriteIpcPayload(const IpcPayload& payload, const IpcWriteOptions& options
Status GetSchemaPayload(const Schema& schema, const IpcWriteOptions& options,
const DictionaryFieldMapper& mapper, IpcPayload* out) {
out->type = MessageType::SCHEMA;
return internal::WriteSchemaMessage(schema, mapper, options, &out->metadata);
ARROW_ASSIGN_OR_RAISE(out->metadata,
internal::WriteSchemaMessage(schema, mapper, options));
return Status::OK();
}

Status GetDictionaryPayload(int64_t id, const std::shared_ptr<Array>& dictionary,
Expand Down
Loading