diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java index f2b33baa6b0..7fd277d2b06 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java @@ -92,17 +92,16 @@ public Builder constraintKey(List constraintKeys) { public TableSchema build() { return new TableSchema(columns, primaryKey, constraintKeys); } + } - public TableSchema copy() { - List copyColumns = - columns.stream().map(Column::copy).collect(Collectors.toList()); - List copyConstraintKeys = - constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList()); - return TableSchema.builder() - .constraintKey(copyConstraintKeys) - .columns(copyColumns) - .primaryKey(primaryKey.copy()) - .build(); - } + public TableSchema copy() { + List copyColumns = columns.stream().map(Column::copy).collect(Collectors.toList()); + List copyConstraintKeys = + constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList()); + return TableSchema.builder() + .constraintKey(copyConstraintKeys) + .columns(copyColumns) + .primaryKey(primaryKey == null ? null : primaryKey.copy()) + .build(); } } diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 7acd3ca4d8c..33dcaed895f 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -62,5 +62,10 @@ connector-console ${project.version} + + org.apache.seatunnel + connector-assert + ${project.version} + diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java index 939710fb1cd..d4aaa5ec4be 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/FilterRowTransform.java @@ -17,11 +17,33 @@ package org.apache.seatunnel.transform.common; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -public abstract class FilterRowTransform extends AbstractSeaTunnelTransform { +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@NoArgsConstructor +public abstract class FilterRowTransform extends AbstractCatalogSupportTransform { + + public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) { + super(inputCatalogTable); + } + @Override protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) { return inputRowType; } + + @Override + protected TableSchema transformTableSchema() { + return inputCatalogTable.getTableSchema().copy(); + } + + @Override + protected TableIdentifier transformTableIdentifier() { + return inputCatalogTable.getTableId().copy(); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java similarity index 63% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java index 34b0c8491fa..1959e2d3bbd 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransform.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.seatunnel.transform; +package org.apache.seatunnel.transform.filterrowkind; import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -29,44 +30,41 @@ import org.apache.seatunnel.transform.common.FilterRowTransform; import com.google.auto.service.AutoService; +import lombok.NoArgsConstructor; +import lombok.NonNull; import lombok.ToString; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; @ToString(of = {"includeKinds", "excludeKinds"}) @AutoService(SeaTunnelTransform.class) +@NoArgsConstructor public class FilterRowKindTransform extends FilterRowTransform { - public static final Option> INCLUDE_KINDS = - Options.key("include_kinds") - .listType(RowKind.class) - .noDefaultValue() - .withDescription("the row kinds to include"); - public static final Option> EXCLUDE_KINDS = - Options.key("exclude_kinds") - .listType(RowKind.class) - .noDefaultValue() - .withDescription("the row kinds to exclude"); + public static String PLUGIN_NAME = "FilterRowKind"; private Set includeKinds = Collections.emptySet(); private Set excludeKinds = Collections.emptySet(); + public FilterRowKindTransform( + @NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) { + super(inputCatalogTable); + initConfig(config); + } + @Override public String getPluginName() { - return "FilterRowKind"; + return PLUGIN_NAME; } - @Override - protected void setConfig(Config pluginConfig) { - if (pluginConfig.hasPath(INCLUDE_KINDS.key())) { - includeKinds = - new HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key())); - } - if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) { + private void initConfig(ReadonlyConfig config) { + if (config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS) == null) { excludeKinds = - new HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key())); + new HashSet(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS)); + } else { + includeKinds = + new HashSet(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS)); } if ((includeKinds.isEmpty() && excludeKinds.isEmpty()) || (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) { @@ -74,16 +72,25 @@ protected void setConfig(Config pluginConfig) { CommonErrorCode.ILLEGAL_ARGUMENT, String.format( "These options(%s,%s) are mutually exclusive, allowing only one set of options to be configured.", - INCLUDE_KINDS.key(), EXCLUDE_KINDS.key())); + FilterRowKinkTransformConfig.INCLUDE_KINDS.key(), + FilterRowKinkTransformConfig.EXCLUDE_KINDS.key())); } } + @Override + protected void setConfig(Config pluginConfig) { + ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)) + .validate(new FilterRowKindTransformFactory().optionRule()); + initConfig(ReadonlyConfig.fromConfig(pluginConfig)); + } + @Override protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { - if (!excludeKinds.isEmpty()) { - return excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow; + if (!this.excludeKinds.isEmpty()) { + return this.excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow; } - if (!includeKinds.isEmpty()) { + if (!this.includeKinds.isEmpty()) { + Set includeKinds = this.includeKinds; return includeKinds.contains(inputRow.getRowKind()) ? inputRow : null; } throw new SeaTunnelRuntimeException( diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java similarity index 62% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java index 5f4b68dc819..9e89ebe5e2c 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/FilterRowKindTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java @@ -15,26 +15,36 @@ * limitations under the License. */ -package org.apache.seatunnel.transform; +package org.apache.seatunnel.transform.filterrowkind; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS; -import static org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS; - @AutoService(Factory.class) public class FilterRowKindTransformFactory implements TableTransformFactory { @Override public String factoryIdentifier() { - return "FilterRowKind"; + return FilterRowKindTransform.PLUGIN_NAME; } @Override public OptionRule optionRule() { - return OptionRule.builder().exclusive(EXCLUDE_KINDS, INCLUDE_KINDS).build(); + return OptionRule.builder() + .exclusive( + FilterRowKinkTransformConfig.EXCLUDE_KINDS, + FilterRowKinkTransformConfig.INCLUDE_KINDS) + .build(); + } + + @Override + public TableTransform createTransform(TableFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new FilterRowKindTransform(context.getOptions(), catalogTable); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java new file mode 100644 index 00000000000..f31a56a0a63 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKinkTransformConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.filterrowkind; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.table.type.RowKind; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.List; + +@Getter +@Setter +public class FilterRowKinkTransformConfig implements Serializable { + + public static final Option> INCLUDE_KINDS = + Options.key("include_kinds") + .listType(RowKind.class) + .noDefaultValue() + .withDescription("the row kinds to include"); + public static final Option> EXCLUDE_KINDS = + Options.key("exclude_kinds") + .listType(RowKind.class) + .noDefaultValue() + .withDescription("the row kinds to exclude"); +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java index 4d7b543fbbf..d77ef843571 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import com.google.auto.service.AutoService; +import lombok.NoArgsConstructor; import lombok.NonNull; import java.util.Arrays; @@ -41,14 +42,11 @@ import java.util.stream.IntStream; @AutoService(SeaTunnelTransform.class) +@NoArgsConstructor public class SplitTransform extends MultipleFieldOutputTransform { private SplitTransformConfig splitTransformConfig; private int splitFieldIndex; - public SplitTransform() { - super(); - } - public SplitTransform( @NonNull SplitTransformConfig splitTransformConfig, @NonNull CatalogTable catalogTable) { diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java index 0cdc9e67b69..0637a8a7078 100644 --- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/FilterRowKindTransformFactoryTest.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.transform; +import org.apache.seatunnel.transform.filterrowkind.FilterRowKindTransformFactory; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test;