Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import java.util.Map;

/** Writer capability for adding format-specific file metadata before closing the file. */
public interface SupportsWriterMetadata {

/** Adds raw metadata entries to the file footer. */
void addMetadata(Map<String, byte[]> metadata);
}
6 changes: 6 additions & 0 deletions paimon-format/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/** Utilities for format metadata encoded at file boundaries. */
public class FormatMetadataUtils {

public static final String ARROW_SCHEMA_METADATA_KEY = "ARROW:schema";

private FormatMetadataUtils() {}

public static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
Map<String, String> encoded = new LinkedHashMap<>();
for (Map.Entry<String, byte[]> entry : metadata.entrySet()) {
encoded.put(entry.getKey(), Base64.getEncoder().encodeToString(entry.getValue()));
}
return encoded;
}

public static Map<String, byte[]> decodeMetadata(Map<String, String> metadata) {
Map<String, byte[]> decoded = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : metadata.entrySet()) {
decoded.put(entry.getKey(), Base64.getDecoder().decode(entry.getValue()));
}
return decoded;
}

public static Optional<Schema> readArrowSchema(String encodedSchema) {
if (encodedSchema == null) {
return Optional.empty();
}
byte[] schemaBytes = Base64.getDecoder().decode(encodedSchema);
return Optional.of(Schema.deserializeMessage(ByteBuffer.wrap(schemaBytes)));
}

/** Returns metadata for top-level Arrow fields only. */
public static Map<String, Map<String, String>> readFieldMetadata(Schema arrowSchema) {
Map<String, Map<String, String>> result = new LinkedHashMap<>();
for (Field field : arrowSchema.getFields()) {
result.put(
field.getName(),
Collections.unmodifiableMap(new LinkedHashMap<>(field.getMetadata())));
}
return Collections.unmodifiableMap(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.arrow.vector.types.pojo.Schema;

import java.io.IOException;
import java.util.Optional;

/** Reader capability for formats that can recover Arrow schema from file metadata. */
public interface SupportsReaderArrowSchema {

Optional<Schema> readArrowSchema() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.format.FormatMetadataUtils;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.OrcFormatReaderContext;
import org.apache.paimon.format.SupportsReaderArrowSchema;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.FileIO;
Expand All @@ -39,6 +41,7 @@
import org.apache.paimon.utils.Pool;
import org.apache.paimon.utils.RoaringBitmap32;

import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
Expand All @@ -54,7 +57,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema;
import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector;
Expand Down Expand Up @@ -104,7 +109,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
Pool<OrcReaderBatch> poolOfBatches =
createPoolOfBatches(context.filePath(), poolSize, context.fileIO());

RecordReader orcReader =
OrcRecordReader orcReader =
createRecordReader(
hadoopConfig,
schema,
Expand Down Expand Up @@ -224,12 +229,14 @@ private ColumnarRowIterator convertAndGetIterator(
* batch is addressed by the starting row number of the batch, plus the number of records to be
* skipped before.
*/
private static final class OrcVectorizedReader implements FileRecordReader<InternalRow> {
private static final class OrcVectorizedReader
implements FileRecordReader<InternalRow>, SupportsReaderArrowSchema {

private final RecordReader orcReader;
private final OrcRecordReader orcReader;
private final Pool<OrcReaderBatch> pool;

private OrcVectorizedReader(final RecordReader orcReader, final Pool<OrcReaderBatch> pool) {
private OrcVectorizedReader(
final OrcRecordReader orcReader, final Pool<OrcReaderBatch> pool) {
this.orcReader = checkNotNull(orcReader, "orcReader");
this.pool = checkNotNull(pool, "pool");
}
Expand All @@ -240,18 +247,40 @@ public ColumnarRowIterator readBatch() throws IOException {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch();

long rowNumber = orcReader.getRowNumber();
if (!nextBatch(orcReader, orcVectorBatch)) {
long rowNumber = orcReader.recordReader.getRowNumber();
if (!nextBatch(orcReader.recordReader, orcVectorBatch)) {
batch.recycle();
return null;
}

return batch.convertAndGetIterator(orcVectorBatch, rowNumber);
}

@Override
public Optional<Schema> readArrowSchema() {
org.apache.orc.Reader fileReader = orcReader.fileReader;
if (!fileReader
.getMetadataKeys()
.contains(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)) {
return Optional.empty();
}
return FormatMetadataUtils.readArrowSchema(
StandardCharsets.UTF_8
.decode(
fileReader
.getMetadataValue(
FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)
.duplicate())
.toString());
}

@Override
public void close() throws IOException {
orcReader.close();
try {
orcReader.recordReader.close();
} finally {
orcReader.fileReader.close();
}
}

private OrcReaderBatch getCachedEntry() throws IOException {
Expand All @@ -264,7 +293,18 @@ private OrcReaderBatch getCachedEntry() throws IOException {
}
}

private static RecordReader createRecordReader(
private static final class OrcRecordReader {

private final org.apache.orc.Reader fileReader;
private final RecordReader recordReader;

private OrcRecordReader(org.apache.orc.Reader fileReader, RecordReader recordReader) {
this.fileReader = fileReader;
this.recordReader = recordReader;
}
}

private static OrcRecordReader createRecordReader(
org.apache.hadoop.conf.Configuration conf,
TypeDescription schema,
List<OrcFilters.Predicate> conjunctPredicates,
Expand Down Expand Up @@ -313,7 +353,7 @@ private static RecordReader createRecordReader(
// assign ids
schema.getId();

return orcRowsReader;
return new OrcRecordReader(orcReader, orcRowsReader);
} catch (IOException e) {
// exception happened, we need to close the reader
IOUtils.closeQuietly(orcReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,32 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatMetadataUtils;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.SupportsWriterMetadata;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.MemorySize;

import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.Writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A {@link FormatWriter} implementation that writes data in ORC format. */
public class OrcBulkWriter implements FormatWriter {
public class OrcBulkWriter implements FormatWriter, SupportsWriterMetadata {

private final Writer writer;
private final Vectorizer<InternalRow> vectorizer;
private final VectorizedRowBatch rowBatch;
private final PositionOutputStream underlyingStream;
private final Map<String, byte[]> metadata;

private long currentBatchMemoryUsage = 0;
private final long memoryLimit;
Expand All @@ -54,6 +62,15 @@ public OrcBulkWriter(
this.rowBatch = vectorizer.getSchema().createRowBatch(batchSize);
this.underlyingStream = underlyingStream;
this.memoryLimit = memoryLimit.getBytes();
this.metadata = new HashMap<>();
}

@Override
public void addMetadata(Map<String, byte[]> metadata) {
for (Map.Entry<String, byte[]> entry : metadata.entrySet()) {
this.metadata.put(
entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length));
}
}

@Override
Expand All @@ -75,6 +92,12 @@ private void flush() throws IOException {
@Override
public void close() throws IOException {
flush();
for (Map.Entry<String, String> entry :
FormatMetadataUtils.encodeMetadata(metadata).entrySet()) {
writer.addUserMetadata(
entry.getKey(),
ByteBuffer.wrap(entry.getValue().getBytes(StandardCharsets.UTF_8)));
}
writer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.parquet.io.OutputFile;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/** A factory that creates a Parquet {@link FormatWriter}. */
public class ParquetWriterFactory implements FormatWriterFactory, SupportsVariantInference {
Expand All @@ -57,8 +59,10 @@ public FormatWriter create(PositionOutputStream stream, String compression) thro
compression = null;
}

final ParquetWriter<InternalRow> writer = writerBuilder.createWriter(out, compression);
return new ParquetBulkWriter(writer);
Map<String, byte[]> metadata = new HashMap<>();
final ParquetWriter<InternalRow> writer =
writerBuilder.createWriter(out, compression, () -> metadata);
return new ParquetBulkWriter(writer, metadata);
}

@Override
Expand All @@ -73,7 +77,9 @@ public FormatWriter createWithShreddingSchema(
ParquetBuilder<InternalRow> newBuilder =
((RowDataParquetBuilder) writerBuilder)
.withShreddingSchemas(inferredShreddingSchema);
final ParquetWriter<InternalRow> writer = newBuilder.createWriter(out, compression);
return new ParquetBulkWriter(writer);
Map<String, byte[]> metadata = new HashMap<>();
final ParquetWriter<InternalRow> writer =
newBuilder.createWriter(out, compression, () -> metadata);
return new ParquetBulkWriter(writer, metadata);
}
}
Loading