diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 50c1f365e5f..059341fe178 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -22,6 +22,7 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.Constants; @@ -113,6 +114,20 @@ public void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException { currentBatchSize++; } + protected SeaTunnelRowType buildSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List sinkColumnsIndex) { + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + List newFieldNames = new ArrayList<>(); + List> newFieldTypes = new ArrayList<>(); + sinkColumnsIndex.forEach(index -> { + newFieldNames.add(fieldNames[index]); + newFieldTypes.add(fieldTypes[index]); + }); + return new SeaTunnelRowType( + newFieldNames.toArray(new String[0]), + newFieldTypes.toArray(new SeaTunnelDataType[0])); + } + /** * use hadoop conf generate hadoop configuration * diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index 9ae10fd35c2..351ed3d5055 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -48,7 +48,8 @@ public JsonWriteStrategy(FileSinkConfig textFileSinkConfig) { @Override public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { super.setSeaTunnelRowTypeInfo(seaTunnelRowType); - this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType); + this.serializationSchema = new JsonSerializationSchema( + buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow)); } @Override @@ -57,7 +58,9 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) { String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); FSDataOutputStream fsDataOutputStream = getOrCreateOutputStream(filePath); try { - byte[] rowBytes = serializationSchema.serialize(seaTunnelRow); + byte[] rowBytes = serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream() + .mapToInt(Integer::intValue) + .toArray())); if (isFirstWrite.get(filePath)) { isFirstWrite.put(filePath, false); } else { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index b0ba6674631..4cdf356efac 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -66,7 +66,7 @@ public TextWriteStrategy(FileSinkConfig textFileSinkConfig) { public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { super.setSeaTunnelRowTypeInfo(seaTunnelRowType); this.serializationSchema = TextSerializationSchema.builder() - .seaTunnelRowType(seaTunnelRowType) + .seaTunnelRowType(buildSchemaWithRowType(seaTunnelRowType, sinkColumnsIndexInRow)) .delimiter(fieldDelimiter) .dateFormatter(dateFormat) .dateTimeFormatter(dateTimeFormat) @@ -85,7 +85,9 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) { } else { fsDataOutputStream.write(rowDelimiter.getBytes()); } - fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow)); + fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow.copy(sinkColumnsIndexInRow.stream() + .mapToInt(Integer::intValue) + .toArray()))); } catch (IOException e) { throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, String.format("Write data to file [%s] failed", filePath), e);