Skip to content

Commit

Permalink
[Fix] Fix PrimaryKey in transform (#5704)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Oct 25, 2023
1 parent 946d89c commit 9944684
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected TableSchema transformTableSchema() {
List<Column> outputColumns = new ArrayList<>(fieldMapper.size());
needReaderColIndex = new ArrayList<>(fieldMapper.size());
ArrayList<String> inputFieldNames = Lists.newArrayList(seaTunnelRowType.getFieldNames());
ArrayList<String> outputFieldNames = new ArrayList<>();
fieldMapper.forEach(
(key, value) -> {
int fieldIndex = inputFieldNames.indexOf(key);
Expand All @@ -110,18 +111,32 @@ protected TableSchema transformTableSchema() {
oldColumn.getDefaultValue(),
oldColumn.getComment());
outputColumns.add(outputColumn);
outputFieldNames.add(outputColumn.getName());
needReaderColIndex.add(fieldIndex);
});

List<ConstraintKey> outputConstraintKeys =
inputCatalogTable.getTableSchema().getConstraintKeys().stream()
.filter(
key -> {
List<String> constraintColumnNames =
key.getColumnNames().stream()
.map(
ConstraintKey.ConstraintKeyColumn
::getColumnName)
.collect(Collectors.toList());
return outputFieldNames.containsAll(constraintColumnNames);
})
.map(ConstraintKey::copy)
.collect(Collectors.toList());

PrimaryKey copiedPrimaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey() == null
? null
: inputCatalogTable.getTableSchema().getPrimaryKey().copy();
PrimaryKey copiedPrimaryKey = null;
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputFieldNames.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy();
}

return TableSchema.builder()
.primaryKey(copiedPrimaryKey)
.columns(outputColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ protected TableSchema transformTableSchema() {
inputCatalogTable.getTableSchema().toPhysicalRowDataType();

inputValueIndex = new int[filterFields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
for (int i = 0; i < filterFields.size(); i++) {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
Expand All @@ -97,21 +98,36 @@ protected TableSchema transformTableSchema() {
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
outputFieldNames.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
}

List<ConstraintKey> copyConstraintKeys =
List<ConstraintKey> outputConstraintKeys =
inputCatalogTable.getTableSchema().getConstraintKeys().stream()
.filter(
key -> {
List<String> constraintColumnNames =
key.getColumnNames().stream()
.map(
ConstraintKey.ConstraintKeyColumn
::getColumnName)
.collect(Collectors.toList());
return outputFieldNames.containsAll(constraintColumnNames);
})
.map(ConstraintKey::copy)
.collect(Collectors.toList());

PrimaryKey copiedPrimaryKey =
inputCatalogTable.getTableSchema().getPrimaryKey() == null
? null
: inputCatalogTable.getTableSchema().getPrimaryKey().copy();
PrimaryKey copiedPrimaryKey = null;
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputFieldNames.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy();
}

return TableSchema.builder()
.columns(outputColumns)
.primaryKey(copiedPrimaryKey)
.constraintKey(copyConstraintKeys)
.constraintKey(outputConstraintKeys)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -38,7 +37,9 @@
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA;

Expand Down Expand Up @@ -119,55 +120,32 @@ protected TableSchema transformTableSchema() {
tryOpen();
List<String> inputColumnsMapping = new ArrayList<>();
SeaTunnelRowType outRowType = sqlEngine.typeMapping(inputColumnsMapping);
List<String> outputColumns = Arrays.asList(outRowType.getFieldNames());

TableSchema.Builder builder = TableSchema.builder();
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
List<String> outPkColumnNames = new ArrayList<>();
for (String pkColumnName :
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) {
for (int i = 0; i < inputColumnsMapping.size(); i++) {
if (pkColumnName.equals(inputColumnsMapping.get(i))) {
outPkColumnNames.add(outRowType.getFieldName(i));
}
}
}
if (!outPkColumnNames.isEmpty()) {
builder.primaryKey(
PrimaryKey.of(
inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(),
outPkColumnNames));
}
}
if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) {
List<ConstraintKey> outConstraintKey = new ArrayList<>();
for (ConstraintKey constraintKey :
inputCatalogTable.getTableSchema().getConstraintKeys()) {
List<ConstraintKey.ConstraintKeyColumn> outConstraintColumnKeys = new ArrayList<>();
for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn :
constraintKey.getColumnNames()) {
String constraintColumnName = constraintKeyColumn.getColumnName();
for (int i = 0; i < inputColumnsMapping.size(); i++) {
if (constraintColumnName.equals(inputColumnsMapping.get(i))) {
outConstraintColumnKeys.add(
ConstraintKey.ConstraintKeyColumn.of(
outRowType.getFieldName(i),
constraintKeyColumn.getSortType()));
}
}
}
if (!outConstraintColumnKeys.isEmpty()) {
outConstraintKey.add(
ConstraintKey.of(
constraintKey.getConstraintType(),
constraintKey.getConstraintName(),
outConstraintColumnKeys));
}
}
if (!outConstraintKey.isEmpty()) {
builder.constraintKey(outConstraintKey);
}
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputColumns.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
}

List<ConstraintKey> outputConstraintKeys =
inputCatalogTable.getTableSchema().getConstraintKeys().stream()
.filter(
key -> {
List<String> constraintColumnNames =
key.getColumnNames().stream()
.map(
ConstraintKey.ConstraintKeyColumn
::getColumnName)
.collect(Collectors.toList());
return outputColumns.containsAll(constraintColumnNames);
})
.map(ConstraintKey::copy)
.collect(Collectors.toList());

builder = builder.constraintKey(outputConstraintKeys);

String[] fieldNames = outRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
List<Column> columns = new ArrayList<>(fieldNames.length);
Expand Down

0 comments on commit 9944684

Please sign in to comment.