Skip to content

Commit

Permalink
Re-add 1.18 and 1.19 changes because they inherit base class changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Jan 31, 2025
1 parent 887ca3e commit 0e96bea
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;

public class FlinkRowData {

private FlinkRowData() {}

public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos);
return rowData -> {
// Be sure to check for null values, even if the field is required. Flink
// RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this
// explicit null check, the null flag of BinaryRowData will be ignored and random bytes will
// be parsed as actual values. This will produce incorrect writes instead of failing with a
// NullPointerException.
if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) {
return null;
}
return flinkFieldGetter.getFieldOrNull(rowData);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.iceberg.flink.FlinkRowData;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand Down Expand Up @@ -492,7 +493,7 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD
super(writers);
fieldGetter = new RowData.FieldGetter[types.size()];
for (int i = 0; i < types.size(); i += 1) {
fieldGetter[i] = RowData.createFieldGetter(types.get(i), i);
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
writeAndValidate(schema, expectedRecords);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

Expand All @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
for (int i = 0; i < expectedRecords.size(); i++) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < numRecord; i += 1) {
for (int i = 0; i < expectedRecords.size(); i += 1) {
assertThat(records).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Expand Down Expand Up @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException {
1643811742000L,
10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
writeAndValidate(SCHEMA_NUM_TYPE, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public class TestFlinkOrcReaderWriter extends DataTest {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);
writeAndValidate(schema, expectedRecords);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = File.createTempFile("junit", null, temp.toFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
Expand All @@ -33,10 +36,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkParquetWriter extends DataTest {
Expand Down Expand Up @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException {
schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
List<RowData> binaryRowList = Lists.newArrayList();
for (Record record : expectedData) {
RowData rowData = RowDataConverter.convert(schema, record);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
binaryRowList.add(binaryRow);
}
writeAndValidate(binaryRowList, schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;

public class FlinkRowData {

private FlinkRowData() {}

public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) {
RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos);
return rowData -> {
// Be sure to check for null values, even if the field is required. Flink
// RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this
// explicit null check, the null flag of BinaryRowData will be ignored and random bytes will
// be parsed as actual values. This will produce incorrect writes instead of failing with a
// NullPointerException.
if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) {
return null;
}
return flinkFieldGetter.getFieldOrNull(rowData);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.iceberg.flink.FlinkRowData;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand Down Expand Up @@ -492,7 +493,7 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD
super(writers);
fieldGetter = new RowData.FieldGetter[types.size()];
for (int i = 0; i < types.size(); i += 1) {
fieldGetter[i] = RowData.createFieldGetter(types.get(i), i);
fieldGetter[i] = FlinkRowData.createFieldGetter(types.get(i), i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
writeAndValidate(schema, expectedRecords);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

Expand All @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
for (int i = 0; i < expectedRecords.size(); i++) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < numRecord; i += 1) {
for (int i = 0; i < expectedRecords.size(); i += 1) {
assertThat(records).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Expand Down Expand Up @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException {
1643811742000L,
10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
writeAndValidate(SCHEMA_NUM_TYPE, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public class TestFlinkOrcReaderWriter extends DataTest {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);
writeAndValidate(schema, expectedRecords);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = File.createTempFile("junit", null, temp.toFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
Expand All @@ -33,10 +36,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkParquetWriter extends DataTest {
Expand Down Expand Up @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException {
schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
List<RowData> binaryRowList = Lists.newArrayList();
for (Record record : expectedData) {
RowData rowData = RowDataConverter.convert(schema, record);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
binaryRowList.add(binaryRow);
}
writeAndValidate(binaryRowList, schema);
}
}
Loading

0 comments on commit 0e96bea

Please sign in to comment.