diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java b/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java new file mode 100644 index 000000000000..baed0c7b404d --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/SupportsWriterMetadata.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import java.util.Map; + +/** Writer capability for adding format-specific file metadata before closing the file. */ +public interface SupportsWriterMetadata { + + /** Adds raw metadata entries to the file footer. */ + void addMetadata(Map metadata); +} diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml index b4136797b8d7..8927cbc77b20 100644 --- a/paimon-format/pom.xml +++ b/paimon-format/pom.xml @@ -47,6 +47,12 @@ under the License. provided + + org.apache.arrow + arrow-vector + ${arrow.version} + + org.xerial.snappy snappy-java diff --git a/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java new file mode 100644 index 000000000000..ca3da141260c --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/FormatMetadataUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +/** Utilities for format metadata encoded at file boundaries. */ +public class FormatMetadataUtils { + + public static final String ARROW_SCHEMA_METADATA_KEY = "ARROW:schema"; + + private FormatMetadataUtils() {} + + public static Map encodeMetadata(Map metadata) { + Map encoded = new LinkedHashMap<>(); + for (Map.Entry entry : metadata.entrySet()) { + encoded.put(entry.getKey(), Base64.getEncoder().encodeToString(entry.getValue())); + } + return encoded; + } + + public static Map decodeMetadata(Map metadata) { + Map decoded = new LinkedHashMap<>(); + for (Map.Entry entry : metadata.entrySet()) { + decoded.put(entry.getKey(), Base64.getDecoder().decode(entry.getValue())); + } + return decoded; + } + + public static Optional readArrowSchema(String encodedSchema) { + if (encodedSchema == null) { + return Optional.empty(); + } + byte[] schemaBytes = Base64.getDecoder().decode(encodedSchema); + return Optional.of(Schema.deserializeMessage(ByteBuffer.wrap(schemaBytes))); + } + + /** Returns metadata for top-level Arrow fields only. */ + public static Map> readFieldMetadata(Schema arrowSchema) { + Map> result = new LinkedHashMap<>(); + for (Field field : arrowSchema.getFields()) { + result.put( + field.getName(), + Collections.unmodifiableMap(new LinkedHashMap<>(field.getMetadata()))); + } + return Collections.unmodifiableMap(result); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java b/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java new file mode 100644 index 000000000000..43e525ddd90b --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/SupportsReaderArrowSchema.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.Optional; + +/** Reader capability for formats that can recover Arrow schema from file metadata. */ +public interface SupportsReaderArrowSchema { + + Optional readArrowSchema() throws IOException; +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 81bfdd318535..c0010b26ce15 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -24,8 +24,10 @@ import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.VectorizedRowIterator; +import org.apache.paimon.format.FormatMetadataUtils; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.OrcFormatReaderContext; +import org.apache.paimon.format.SupportsReaderArrowSchema; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.fs.FileIO; @@ -39,6 +41,7 @@ import org.apache.paimon.utils.Pool; import org.apache.paimon.utils.RoaringBitmap32; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -54,7 +57,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; import static org.apache.paimon.format.orc.OrcTypeUtil.convertToOrcSchema; import static org.apache.paimon.format.orc.reader.AbstractOrcColumnVector.createPaimonVector; @@ -104,7 +109,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) Pool poolOfBatches = createPoolOfBatches(context.filePath(), poolSize, context.fileIO()); - RecordReader orcReader = + OrcRecordReader orcReader = createRecordReader( hadoopConfig, schema, @@ -224,12 +229,14 @@ private ColumnarRowIterator convertAndGetIterator( * batch is addressed by the starting row number of the batch, plus the number of records to be * skipped before. */ - private static final class OrcVectorizedReader implements FileRecordReader { + private static final class OrcVectorizedReader + implements FileRecordReader, SupportsReaderArrowSchema { - private final RecordReader orcReader; + private final OrcRecordReader orcReader; private final Pool pool; - private OrcVectorizedReader(final RecordReader orcReader, final Pool pool) { + private OrcVectorizedReader( + final OrcRecordReader orcReader, final Pool pool) { this.orcReader = checkNotNull(orcReader, "orcReader"); this.pool = checkNotNull(pool, "pool"); } @@ -240,8 +247,8 @@ public ColumnarRowIterator readBatch() throws IOException { final OrcReaderBatch batch = getCachedEntry(); final VectorizedRowBatch orcVectorBatch = batch.orcVectorizedRowBatch(); - long rowNumber = orcReader.getRowNumber(); - if (!nextBatch(orcReader, orcVectorBatch)) { + long rowNumber = orcReader.recordReader.getRowNumber(); + if (!nextBatch(orcReader.recordReader, orcVectorBatch)) { batch.recycle(); return null; } @@ -249,9 +256,31 @@ public ColumnarRowIterator readBatch() throws IOException { return batch.convertAndGetIterator(orcVectorBatch, rowNumber); } + @Override + public Optional readArrowSchema() { + org.apache.orc.Reader fileReader = orcReader.fileReader; + if (!fileReader + .getMetadataKeys() + .contains(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)) { + return Optional.empty(); + } + return FormatMetadataUtils.readArrowSchema( + StandardCharsets.UTF_8 + .decode( + fileReader + .getMetadataValue( + FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY) + .duplicate()) + .toString()); + } + @Override public void close() throws IOException { - orcReader.close(); + try { + orcReader.recordReader.close(); + } finally { + orcReader.fileReader.close(); + } } private OrcReaderBatch getCachedEntry() throws IOException { @@ -264,7 +293,18 @@ private OrcReaderBatch getCachedEntry() throws IOException { } } - private static RecordReader createRecordReader( + private static final class OrcRecordReader { + + private final org.apache.orc.Reader fileReader; + private final RecordReader recordReader; + + private OrcRecordReader(org.apache.orc.Reader fileReader, RecordReader recordReader) { + this.fileReader = fileReader; + this.recordReader = recordReader; + } + } + + private static OrcRecordReader createRecordReader( org.apache.hadoop.conf.Configuration conf, TypeDescription schema, List conjunctPredicates, @@ -313,7 +353,7 @@ private static RecordReader createRecordReader( // assign ids schema.getId(); - return orcRowsReader; + return new OrcRecordReader(orcReader, orcRowsReader); } catch (IOException e) { // exception happened, we need to close the reader IOUtils.closeQuietly(orcReader); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java index c44e3f26d671..32809c59ebdc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java @@ -20,7 +20,9 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatMetadataUtils; import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.SupportsWriterMetadata; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.MemorySize; @@ -28,16 +30,22 @@ import org.apache.orc.Writer; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A {@link FormatWriter} implementation that writes data in ORC format. */ -public class OrcBulkWriter implements FormatWriter { +public class OrcBulkWriter implements FormatWriter, SupportsWriterMetadata { private final Writer writer; private final Vectorizer vectorizer; private final VectorizedRowBatch rowBatch; private final PositionOutputStream underlyingStream; + private final Map metadata; private long currentBatchMemoryUsage = 0; private final long memoryLimit; @@ -54,6 +62,15 @@ public OrcBulkWriter( this.rowBatch = vectorizer.getSchema().createRowBatch(batchSize); this.underlyingStream = underlyingStream; this.memoryLimit = memoryLimit.getBytes(); + this.metadata = new HashMap<>(); + } + + @Override + public void addMetadata(Map metadata) { + for (Map.Entry entry : metadata.entrySet()) { + this.metadata.put( + entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length)); + } } @Override @@ -75,6 +92,12 @@ private void flush() throws IOException { @Override public void close() throws IOException { flush(); + for (Map.Entry entry : + FormatMetadataUtils.encodeMetadata(metadata).entrySet()) { + writer.addUserMetadata( + entry.getKey(), + ByteBuffer.wrap(entry.getValue().getBytes(StandardCharsets.UTF_8))); + } writer.close(); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java index 282805897a5f..7747be7a3b0b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java @@ -34,6 +34,8 @@ import org.apache.parquet.io.OutputFile; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** A factory that creates a Parquet {@link FormatWriter}. */ public class ParquetWriterFactory implements FormatWriterFactory, SupportsVariantInference { @@ -57,8 +59,10 @@ public FormatWriter create(PositionOutputStream stream, String compression) thro compression = null; } - final ParquetWriter writer = writerBuilder.createWriter(out, compression); - return new ParquetBulkWriter(writer); + Map metadata = new HashMap<>(); + final ParquetWriter writer = + writerBuilder.createWriter(out, compression, () -> metadata); + return new ParquetBulkWriter(writer, metadata); } @Override @@ -73,7 +77,9 @@ public FormatWriter createWithShreddingSchema( ParquetBuilder newBuilder = ((RowDataParquetBuilder) writerBuilder) .withShreddingSchemas(inferredShreddingSchema); - final ParquetWriter writer = newBuilder.createWriter(out, compression); - return new ParquetBulkWriter(writer); + Map metadata = new HashMap<>(); + final ParquetWriter writer = + newBuilder.createWriter(out, compression, () -> metadata); + return new ParquetBulkWriter(writer, metadata); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java index af63a7d9f96b..047027538628 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java @@ -20,6 +20,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.columnar.writable.WritableColumnVector; +import org.apache.paimon.format.FormatMetadataUtils; +import org.apache.paimon.format.SupportsReaderArrowSchema; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.format.parquet.type.ParquetPrimitiveField; import org.apache.paimon.fs.FileIO; @@ -41,6 +43,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -48,7 +51,8 @@ import static org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createReadableColumnVectors; /** Record reader for parquet. */ -public class VectorizedParquetRecordReader implements FileRecordReader { +public class VectorizedParquetRecordReader + implements FileRecordReader, SupportsReaderArrowSchema { private ParquetFileReader reader; @@ -269,6 +273,16 @@ private void initColumnReader(PageReadStore pages, ParquetColumnVector cv) throw } } + @Override + public Optional readArrowSchema() + throws IOException { + return FormatMetadataUtils.readArrowSchema( + reader.getFooter() + .getFileMetaData() + .getKeyValueMetaData() + .get(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY)); + } + @Override public void close() throws IOException { if (reader != null) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java index 808febd9a2cc..601e95a4fd4e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBuilder.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Map; +import java.util.function.Supplier; /** * A builder to create a {@link ParquetWriter} from a Parquet {@link OutputFile}. @@ -34,4 +36,10 @@ public interface ParquetBuilder extends Serializable { /** Creates and configures a parquet writer to the given output file. */ ParquetWriter createWriter(OutputFile out, String compression) throws IOException; + + default ParquetWriter createWriter( + OutputFile out, String compression, Supplier> metadataSupplier) + throws IOException { + return createWriter(out, compression); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java index d7282f699f1e..4b15c71be784 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.SupportsWriterMetadata; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -27,11 +28,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A simple {@link FormatWriter} implementation that wraps a {@link ParquetWriter}. */ -public class ParquetBulkWriter implements FormatWriter { +public class ParquetBulkWriter implements FormatWriter, SupportsWriterMetadata { /** The ParquetWriter to write to. */ private final ParquetWriter parquetWriter; @@ -39,13 +43,29 @@ public class ParquetBulkWriter implements FormatWriter { /** Cached footer metadata after close, used to avoid re-reading the file for stats. */ @Nullable private ParquetMetadata footerMetadata; + private final Map metadata; + /** * Creates a new ParquetBulkWriter wrapping the given ParquetWriter. * * @param parquetWriter The ParquetWriter to write to. */ public ParquetBulkWriter(ParquetWriter parquetWriter) { + this(parquetWriter, new HashMap<>()); + } + + public ParquetBulkWriter( + ParquetWriter parquetWriter, Map metadata) { this.parquetWriter = checkNotNull(parquetWriter, "parquetWriter"); + this.metadata = checkNotNull(metadata, "metadata"); + } + + @Override + public void addMetadata(Map metadata) { + for (Map.Entry entry : metadata.entrySet()) { + this.metadata.put( + entry.getKey(), Arrays.copyOf(entry.getValue(), entry.getValue().length)); + } } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java index 14970e548e9b..60fd1097fed3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataBuilder.java @@ -19,12 +19,14 @@ package org.apache.paimon.format.parquet.writer; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatMetadataUtils; import org.apache.paimon.format.parquet.VariantUtils; import org.apache.paimon.types.RowType; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; @@ -32,6 +34,8 @@ import javax.annotation.Nullable; import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType; @@ -41,12 +45,20 @@ public class ParquetRowDataBuilder private final RowType rowType; @Nullable private final RowType shreddingSchemas; + private Supplier> metadataSupplier; public ParquetRowDataBuilder( OutputFile path, RowType rowType, @Nullable RowType shreddingSchemas) { super(path); this.rowType = rowType; this.shreddingSchemas = shreddingSchemas; + this.metadataSupplier = HashMap::new; + } + + public ParquetRowDataBuilder withMetadataSupplier( + Supplier> metadataSupplier) { + this.metadataSupplier = metadataSupplier; + return this; } @Override @@ -89,5 +101,11 @@ public void prepareForWrite(RecordConsumer recordConsumer) { public void write(InternalRow record) { this.writer.write(record); } + + @Override + public FinalizedWriteContext finalizeWrite() { + return new FinalizedWriteContext( + FormatMetadataUtils.encodeMetadata(metadataSupplier.get())); + } } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java index 2e84df0932a0..e87367f9ef40 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java @@ -35,6 +35,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; /** A {@link ParquetBuilder} for {@link InternalRow}. */ public class RowDataParquetBuilder implements ParquetBuilder { @@ -58,8 +61,16 @@ public RowDataParquetBuilder withShreddingSchemas(RowType shreddingSchemas) { @Override public ParquetWriter createWriter(OutputFile out, String compression) throws IOException { + return createWriter(out, compression, HashMap::new); + } + + @Override + public ParquetWriter createWriter( + OutputFile out, String compression, Supplier> metadataSupplier) + throws IOException { ParquetRowDataBuilder builder = new ParquetRowDataBuilder(out, rowType, shreddingSchemas) + .withMetadataSupplier(metadataSupplier) .withConf(conf) .withCompressionCodec(getCompressionCodec(getCompression(compression))) .withRowGroupSize( diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatMetadataUtilsTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatMetadataUtilsTest.java new file mode 100644 index 000000000000..8b3b85dde8da --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/FormatMetadataUtilsTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FormatMetadataUtils}. */ +public class FormatMetadataUtilsTest { + + @Test + public void testDecodeMetadata() { + Map metadata = new LinkedHashMap<>(); + metadata.put( + "encoded", + Base64.getEncoder() + .encodeToString("paimon-value".getBytes(StandardCharsets.UTF_8))); + + Map decoded = FormatMetadataUtils.decodeMetadata(metadata); + + assertThat(new String(decoded.get("encoded"), StandardCharsets.UTF_8)) + .isEqualTo("paimon-value"); + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java index fb625c68da75..57e772cbaa80 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java @@ -24,24 +24,38 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatMetadataUtils; import org.apache.paimon.format.FormatReadWriteTest; import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.OrcOptions; +import org.apache.paimon.format.SupportsReaderArrowSchema; +import org.apache.paimon.format.SupportsWriterMetadata; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.TimeZone; import static org.assertj.core.api.Assertions.assertThat; @@ -81,6 +95,73 @@ protected OrcFormatReadWriteTest() { super("orc"); } + @Test + public void testWriteMetadata() throws IOException { + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD(1, "name", DataTypes.STRING())); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = newFormat.createWriterFactory(rowType).create(out, "zstd"); + Map fieldMetadata = new HashMap<>(); + fieldMetadata.put("paimon.test.field-key", "field-value"); + fieldMetadata.put("paimon.test.field-version", "1"); + Schema arrowSchema = + new Schema( + Arrays.asList( + new Field( + "id", + new FieldType( + true, + Types.MinorType.INT.getType(), + null, + Collections.singletonMap("PARQUET:field_id", "0")), + null), + new Field( + "name", + new FieldType( + true, + Types.MinorType.VARCHAR.getType(), + null, + fieldMetadata), + null))); + byte[] arrowSchemaBytes = arrowSchema.serializeAsMessage(); + Map metadata = new HashMap<>(); + metadata.put("paimon.test.key", "paimon-test-value".getBytes(StandardCharsets.UTF_8)); + metadata.put(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY, arrowSchemaBytes); + ((SupportsWriterMetadata) writer).addMetadata(metadata); + writer.addElement(GenericRow.of(1, org.apache.paimon.data.BinaryString.fromString("one"))); + writer.close(); + out.close(); + + try (org.apache.orc.Reader reader = + OrcReaderFactory.createReader( + new org.apache.hadoop.conf.Configuration(false), fileIO, file, null)) { + ByteBuffer value = reader.getMetadataValue("paimon.test.key"); + Map decodedMetadata = + FormatMetadataUtils.decodeMetadata( + Collections.singletonMap( + "paimon.test.key", + StandardCharsets.UTF_8.decode(value.duplicate()).toString())); + assertThat(new String(decodedMetadata.get("paimon.test.key"), StandardCharsets.UTF_8)) + .isEqualTo("paimon-test-value"); + } + + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + try (FileRecordReader reader = + newFormat + .createReaderFactory(rowType, rowType, Collections.emptyList()) + .createReader(context)) { + Optional readArrowSchema = + ((SupportsReaderArrowSchema) reader).readArrowSchema(); + assertThat(readArrowSchema).hasValue(arrowSchema); + assertThat(FormatMetadataUtils.readFieldMetadata(readArrowSchema.get())) + .containsEntry("name", fieldMetadata); + } + } + @Test public void testTimestampLTZWithLegacyWriteAndRead() throws IOException { RowType rowType = DataTypes.ROW(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index 137da29aa6a0..33b31ebc3279 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -20,15 +20,25 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatMetadataUtils; import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.SupportsReaderArrowSchema; +import org.apache.paimon.format.SupportsWriterMetadata; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -40,9 +50,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; /** A parquet {@link FormatReadWriteTest}. */ @@ -58,6 +72,74 @@ protected FileFormat fileFormat() { new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); } + @Test + public void testWriteMetadata() throws Exception { + ParquetFileFormat format = + new ParquetFileFormat( + new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD(0, "id", DataTypes.INT()), + DataTypes.FIELD(1, "name", DataTypes.STRING())); + + PositionOutputStream out = fileIO.newOutputStream(file, false); + FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd"); + Map fieldMetadata = new HashMap<>(); + fieldMetadata.put("paimon.test.field-key", "field-value"); + fieldMetadata.put("paimon.test.field-version", "1"); + Schema arrowSchema = + new Schema( + Arrays.asList( + new Field( + "id", + new FieldType( + true, + Types.MinorType.INT.getType(), + null, + Collections.singletonMap("PARQUET:field_id", "0")), + null), + new Field( + "name", + new FieldType( + true, + Types.MinorType.VARCHAR.getType(), + null, + fieldMetadata), + null))); + byte[] arrowSchemaBytes = arrowSchema.serializeAsMessage(); + Map metadata = new HashMap<>(); + metadata.put("paimon.test.key", "paimon-test-value".getBytes(StandardCharsets.UTF_8)); + metadata.put(FormatMetadataUtils.ARROW_SCHEMA_METADATA_KEY, arrowSchemaBytes); + ((SupportsWriterMetadata) writer).addMetadata(metadata); + writer.addElement(GenericRow.of(1, BinaryString.fromString("one"))); + writer.close(); + out.close(); + + try (ParquetFileReader reader = + ParquetUtil.getParquetReader( + fileIO, file, fileIO.getFileSize(file), new Options())) { + Map fileMetadata = + reader.getFooter().getFileMetaData().getKeyValueMetaData(); + Map decodedMetadata = FormatMetadataUtils.decodeMetadata(fileMetadata); + Assertions.assertThat( + new String( + decodedMetadata.get("paimon.test.key"), StandardCharsets.UTF_8)) + .isEqualTo("paimon-test-value"); + } + + FormatReaderContext context = + new FormatReaderContext(fileIO, file, fileIO.getFileSize(file)); + try (FileRecordReader reader = + format.createReaderFactory(rowType, rowType, Collections.emptyList()) + .createReader(context)) { + Optional readArrowSchema = + ((SupportsReaderArrowSchema) reader).readArrowSchema(); + Assertions.assertThat(readArrowSchema).hasValue(arrowSchema); + Assertions.assertThat(FormatMetadataUtils.readFieldMetadata(readArrowSchema.get())) + .containsEntry("name", fieldMetadata); + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testEnableBloomFilter(boolean enabled) throws Exception {