Skip to content

Commit

Permalink
Add Catalog support for FilterRowKindTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Mar 28, 2023
1 parent 87505e6 commit 322d250
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,16 @@ public Builder constraintKey(List<ConstraintKey> constraintKeys) {
public TableSchema build() {
return new TableSchema(columns, primaryKey, constraintKeys);
}
}

public TableSchema copy() {
List<Column> copyColumns =
columns.stream().map(Column::copy).collect(Collectors.toList());
List<ConstraintKey> copyConstraintKeys =
constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
return TableSchema.builder()
.constraintKey(copyConstraintKeys)
.columns(copyColumns)
.primaryKey(primaryKey.copy())
.build();
}
public TableSchema copy() {
List<Column> copyColumns = columns.stream().map(Column::copy).collect(Collectors.toList());
List<ConstraintKey> copyConstraintKeys =
constraintKeys.stream().map(ConstraintKey::copy).collect(Collectors.toList());
return TableSchema.builder()
.constraintKey(copyConstraintKeys)
.columns(copyColumns)
.primaryKey(primaryKey == null ? null : primaryKey.copy())
.build();
}
}
5 changes: 5 additions & 0 deletions seatunnel-examples/seatunnel-engine-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,33 @@

package org.apache.seatunnel.transform.common;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

public abstract class FilterRowTransform extends AbstractSeaTunnelTransform {
import lombok.NoArgsConstructor;
import lombok.NonNull;

@NoArgsConstructor
public abstract class FilterRowTransform extends AbstractCatalogSupportTransform {

public FilterRowTransform(@NonNull CatalogTable inputCatalogTable) {
super(inputCatalogTable);
}

@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
return inputRowType;
}

@Override
protected TableSchema transformTableSchema() {
return inputCatalogTable.getTableSchema().copy();
}

@Override
protected TableIdentifier transformTableIdentifier() {
return inputCatalogTable.getTableId().copy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* limitations under the License.
*/

package org.apache.seatunnel.transform;
package org.apache.seatunnel.transform.filterrowkind;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
Expand All @@ -29,61 +30,66 @@
import org.apache.seatunnel.transform.common.FilterRowTransform;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.ToString;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@ToString(of = {"includeKinds", "excludeKinds"})
@AutoService(SeaTunnelTransform.class)
@NoArgsConstructor
public class FilterRowKindTransform extends FilterRowTransform {
public static final Option<List<RowKind>> INCLUDE_KINDS =
Options.key("include_kinds")
.listType(RowKind.class)
.noDefaultValue()
.withDescription("the row kinds to include");
public static final Option<List<RowKind>> EXCLUDE_KINDS =
Options.key("exclude_kinds")
.listType(RowKind.class)
.noDefaultValue()
.withDescription("the row kinds to exclude");
public static String PLUGIN_NAME = "FilterRowKind";

private ReadonlyConfig config;

private Set<RowKind> includeKinds = Collections.emptySet();
private Set<RowKind> excludeKinds = Collections.emptySet();

@Override
public String getPluginName() {
return "FilterRowKind";
}

@Override
protected void setConfig(Config pluginConfig) {
if (pluginConfig.hasPath(INCLUDE_KINDS.key())) {
includeKinds =
new HashSet<>(pluginConfig.getEnumList(RowKind.class, INCLUDE_KINDS.key()));
}
if (pluginConfig.hasPath(EXCLUDE_KINDS.key())) {
public FilterRowKindTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) {
super(inputCatalogTable);
this.config = config;
if (config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS) == null) {
excludeKinds =
new HashSet<>(pluginConfig.getEnumList(RowKind.class, EXCLUDE_KINDS.key()));
new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS));
} else {
includeKinds =
new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS));
}
if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
|| (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
throw new SeaTunnelRuntimeException(
CommonErrorCode.ILLEGAL_ARGUMENT,
String.format(
"These options(%s,%s) are mutually exclusive, allowing only one set of options to be configured.",
INCLUDE_KINDS.key(), EXCLUDE_KINDS.key()));
FilterRowKinkTransformConfig.INCLUDE_KINDS.key(),
FilterRowKinkTransformConfig.EXCLUDE_KINDS.key()));
}
}

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
protected void setConfig(Config pluginConfig) {
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
.validate(new FilterRowKindTransformFactory().optionRule());
this.config = ReadonlyConfig.fromConfig(pluginConfig);
}

@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
if (!excludeKinds.isEmpty()) {
return excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
if (!this.excludeKinds.isEmpty()) {
return this.excludeKinds.contains(inputRow.getRowKind()) ? null : inputRow;
}
if (!includeKinds.isEmpty()) {
if (!this.includeKinds.isEmpty()) {
Set<RowKind> includeKinds = this.includeKinds;
return includeKinds.contains(inputRow.getRowKind()) ? inputRow : null;
}
throw new SeaTunnelRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,36 @@
* limitations under the License.
*/

package org.apache.seatunnel.transform;
package org.apache.seatunnel.transform.filterrowkind;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.transform.FilterRowKindTransform.EXCLUDE_KINDS;
import static org.apache.seatunnel.transform.FilterRowKindTransform.INCLUDE_KINDS;

@AutoService(Factory.class)
public class FilterRowKindTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
return "FilterRowKind";
return FilterRowKindTransform.PLUGIN_NAME;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().exclusive(EXCLUDE_KINDS, INCLUDE_KINDS).build();
return OptionRule.builder()
.exclusive(
FilterRowKinkTransformConfig.EXCLUDE_KINDS,
FilterRowKinkTransformConfig.INCLUDE_KINDS)
.build();
}

@Override
public TableTransform createTransform(TableFactoryContext context) {
CatalogTable catalogTable = context.getCatalogTable();
return () -> new FilterRowKindTransform(context.getOptions(), catalogTable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.filterrowkind;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.type.RowKind;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.List;

@Getter
@Setter
public class FilterRowKinkTransformConfig implements Serializable {

public static final Option<List<RowKind>> INCLUDE_KINDS =
Options.key("include_kinds")
.listType(RowKind.class)
.noDefaultValue()
.withDescription("the row kinds to include");
public static final Option<List<RowKind>> EXCLUDE_KINDS =
Options.key("exclude_kinds")
.listType(RowKind.class)
.noDefaultValue()
.withDescription("the row kinds to exclude");
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
import lombok.NonNull;

import java.util.Arrays;
Expand All @@ -41,14 +42,11 @@
import java.util.stream.IntStream;

@AutoService(SeaTunnelTransform.class)
@NoArgsConstructor
public class SplitTransform extends MultipleFieldOutputTransform {
private SplitTransformConfig splitTransformConfig;
private int splitFieldIndex;

public SplitTransform() {
super();
}

public SplitTransform(
@NonNull SplitTransformConfig splitTransformConfig,
@NonNull CatalogTable catalogTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.transform;

import org.apache.seatunnel.transform.filterrowkind.FilterRowKindTransformFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down

0 comments on commit 322d250

Please sign in to comment.