Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector V2] Add SaveMode for All Sink Connector #3824

Closed
EricJoy2048 opened this issue Dec 29, 2022 · 5 comments
Closed

[Feature][Connector V2] Add SaveMode for All Sink Connector #3824

EricJoy2048 opened this issue Dec 29, 2022 · 5 comments
Labels

Comments

@EricJoy2048
Copy link
Member

EricJoy2048 commented Dec 29, 2022

Background

Currently, SeaTunnel's sink connector does not support the SaveMode function. So I create this issue and discuss how to add SaveMode feature to SeaTunnel.

Unified SaveMode Type

I have checked all Sink connectors. At present, SaveMode can be divided into two categories.

PathSaveMode

The SaveMode for the Sink connectors that use path to organize data, For example File Connectors.

public enum PathSaveMode {

    DROP_DATA,

    KEEP_DATA,

    ERROR
}

TableSaveMode

The SaveMode for the Sink connectors that use table or other table structures to organize data.

public enum TableSaveMode {
    DROP_TABLE,

    KEEP_TABLE_DROP_DATA,

    KEEP_TABLE_AND_DATA,

    ERROR_WHEN_EXISTS
}

Add interface for SaveMode

I will add tow interface for SaveMode.

SupportPathSaveMode

The Sink Connectors which support PathSaveMode should implement this interface

/**
 * The Sink Connectors which support PathSaveMode should implement this interface
 */
public interface SupportPathSaveMode {
    /**
     * We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
     * checkOptions method have a default implement to check whether `save_mode` parameter is in config.
     *
     * @param config config of Sink Connector
     * @return
     */
    default PathSaveMode checkOptions(Config config) {
        if (config.hasPath(SinkCommonOptions.PATH_SAVE_MODE.key())) {
            String pathSaveMode = config.getString(SinkCommonOptions.PATH_SAVE_MODE.key());
            return PathSaveMode.valueOf(pathSaveMode.toUpperCase(Locale.ROOT));
        } else {
            throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                SinkCommonOptions.PATH_SAVE_MODE.key() + " must in config");
        }
    }

    void handleSaveMode(PathSaveMode pathSaveMode);
}

SupportTableSaveMode

The Sink Connectors which support TableSaveMode should implement this interface.

/**
 * The Sink Connectors which support TableSaveMode should implement this interface
 */
public interface SupportTableSaveMode {

    /**
     * We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
     * checkOptions method have a default implement to check whether `save_mode` parameter is in config.
     *
     * @param config config of Sink Connector
     * @return TableSaveMode TableSaveMode
     */
    default TableSaveMode checkOptions(Config config) {
        if (config.hasPath(SinkCommonOptions.TABLE_SAVE_MODE.key())) {
            String tableSaveMode = config.getString(SinkCommonOptions.TABLE_SAVE_MODE.key());
            return TableSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
        } else {
            throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                SinkCommonOptions.TABLE_SAVE_MODE.key() + " must in config");
        }
    }

    void handleSaveMode(TableSaveMode tableSaveMode);
}


Add SaveMode Option to Sink Connector

In order to unify the parameter names of each Sink connector that supports SaveMode, I added two parameters about SaveMode.

    public static final Option<TableSaveMode> TABLE_SAVE_MODE =
        Options.key("save_mode")
            .enumType(TableSaveMode.class)
            .noDefaultValue()
            .withDescription("The table save mode");

    public static final Option<PathSaveMode> PATH_SAVE_MODE =
        Options.key("save_mode")
            .enumType(PathSaveMode.class)
            .noDefaultValue()
            .withDescription("The path save mode");

Automatically add SaveMode Option to the OptionRule of each connector supported SaveMode

To do this, I add SupportPathSaveMode and SupportTableSaveMode interface check in FactoryUtil.sinkFullOptionRule.

/**
     * This method is called by SeaTunnel Web to get the full option rule of a sink.
     * @return
     */
    public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
        OptionRule sinkOptionRule = factory.optionRule();
        if (sinkOptionRule == null) {
            throw new FactoryException("sinkOptionRule can not be null");
        }

        Class<? extends SeaTunnelSink> sinkClass = factory.getSinkClass();
        if (sinkClass.isAssignableFrom(SupportTableSaveMode.class)) {
            OptionRule sinkCommonOptionRule =
                OptionRule.builder().required(SinkCommonOptions.TABLE_SAVE_MODE).build();
            sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
        }

        if (sinkClass.isAssignableFrom(SupportPathSaveMode.class)) {
            OptionRule sinkCommonOptionRule =
                OptionRule.builder().required(SinkCommonOptions.PATH_SAVE_MODE).build();
            sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
        }

        return sinkOptionRule;
    }

Because if (sinkClass.isAssignableFrom(SupportPathSaveMode.class)) need to know the Sink Class, So I add Class<? extends SeaTunnelSink> getSinkClass(); to TableSinkFactory. Every Sink Connector need implement getSinkClass method.

public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory {

    /**
     * We will never use this method now. So gave a default implement and return null.
     *
     * @param context TableFactoryContext
     * @return
     */
    default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(TableFactoryContext context) {
        throw new UnsupportedOperationException("unsupported now");
    }

    Class<? extends SeaTunnelSink> getSinkClass();
}

What stage should SaveMode be processed

I think void handleSaveMode(PathSaveMode pathSaveMode); and void handleSaveMode(TableSaveMode tableSaveMode); method should be call after prepare().

So I updated the starter code.

org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins

    @Override
    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
        List<URL> pluginJars = new ArrayList<>();
        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
            PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
            pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
            seaTunnelSink.prepare(sinkConfig);
            seaTunnelSink.setJobContext(jobContext);
            if (seaTunnelSink.getClass().isAssignableFrom(SupportTableSaveMode.class)) {
                SupportTableSaveMode saveModeSink = (SupportTableSaveMode) seaTunnelSink;
                TableSaveMode tableSaveMode = saveModeSink.checkOptions(sinkConfig);
                saveModeSink.handleSaveMode(tableSaveMode);
            }

            if (seaTunnelSink.getClass().isAssignableFrom(SupportPathSaveMode.class)) {
                SupportPathSaveMode saveModeSink = (SupportPathSaveMode) seaTunnelSink;
                PathSaveMode pathSaveMode = saveModeSink.checkOptions(sinkConfig);
                saveModeSink.handleSaveMode(pathSaveMode);
            }

            return seaTunnelSink;
        }).distinct().collect(Collectors.toList());
        jarPaths.addAll(pluginJars);
        return sinks;
    }

About automatic create table

I think if we support automatic create table, we can do it after handleSaveMode in org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins(or other starter).

@EricJoy2048
Copy link
Member Author

After discuss in #3851, I will update the design here.

@EricJoy2048
Copy link
Member Author

EricJoy2048 commented Jan 6, 2023

Background

Currently, SeaTunnel's sink connector does not support the SaveMode function. So I create this issue and discuss how to add SaveMode feature to SeaTunnel.

Unified SaveMode Type

I have checked all Sink connectors. At present, SaveMode can be unified into four modes.

/**
 * The SaveMode for the Sink connectors that use table or other table structures to organize data
 */
public enum DataSaveMode {
    // Will drop table in MySQL, Will drop path for File Connector.
    DROP_SCHEMA,

    // Only drop the data in MySQL, Only drop the files in the path for File Connector.
    KEEP_SCHEMA_DROP_DATA,

    // Keep the table and data and continue to write data to the existing table for MySQL. Keep the path and files in the path, create new files in the path.
    KEEP_SCHEMA_AND_DATA,

    // Throw error when table is exists for MySQL. Throw error when path is exists.
    ERROR_WHEN_EXISTS
}

Add interface for SaveMode

I will add tow interface for SaveMode.

SupportDataSaveMode

The Sink Connectors which support DataSaveMode should implement this interface

/**
 * The Sink Connectors which support data SaveMode should implement this interface
 */
public interface SupportDataSaveMode {

    /**
     * We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
     * checkOptions method have a default implement to check whether `save_mode` parameter is in config.
     *
     * @param config config of Sink Connector
     * @return TableSaveMode TableSaveMode
     */
    default void checkOptions(Config config) {
        if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
            String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE);
            DataSaveMode dataSaveMode = DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
            if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
                throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                    "This connector don't support save mode: " + dataSaveMode);
            }
        } else {
            throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                SinkCommonOptions.DATA_SAVE_MODE + " must in config");
        }
    }

    DataSaveMode getDataSaveModeUsed();

    /**
     * Return the DataSaveMode list supported by this connector
     * @return
     */
    List<DataSaveMode> supportedDataSaveModeValues();

    void handleSaveMode(DataSaveMode tableSaveMode);
}

Add SaveMode Option to Sink Connector

In order to unify the parameter names of each Sink connector that supports SaveMode, I added a parameters about SaveMode.

public static final String DATA_SAVE_MODE = "save_mode"

Add a SingleChoiceOption to let the connector set the DataSaveMode list they supported. The method related to the construction of this Option will not be described in detail here.

public class SingleChoiceOption<T> extends Option{

    @Getter
    private final List<T> optionValues;

    public SingleChoiceOption(String key,
                              TypeReference<T> typeReference,
                              List<T> optionValues,
                              T defaultValue) {
        super(key, typeReference, defaultValue);
        this.optionValues = optionValues;
    }
}

Automatically add DataSaveMode Option to the OptionRule of each connector supported DataSaveMode

To do this, I add SupportDataSaveMode and SupportDataSaveMode interface check in FactoryUtil.sinkFullOptionRule.

Please note that the createSink method of the TableSinkFactory interface is enabled here, So the connector which implement SupportDataSaveMode must implement TableSinkFactory#createSink.

/**
     * This method is called by SeaTunnel Web to get the full option rule of a sink.
     * @return
     */
    public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
        OptionRule sinkOptionRule = factory.optionRule();
        if (sinkOptionRule == null) {
            throw new FactoryException("sinkOptionRule can not be null");
        }

        try {
            TableSink sink = factory.createSink(null);
            if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
                SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink;
                Option<DataSaveMode> saveMode =
                    Options.key(SinkCommonOptions.DATA_SAVE_MODE)
                        .singleChoice(DataSaveMode.class, supportDataSaveModeSink.supportedDataSaveModeValues())
                        .noDefaultValue()
                        .withDescription("data save mode");
                OptionRule sinkCommonOptionRule =
                    OptionRule.builder().required(saveMode).build();
                sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
            }
        } catch (UnsupportedOperationException e) {
            LOG.warn("Add save mode option need sink connector support create sink by TableSinkFactory");
        }

        return sinkOptionRule;
    }

What stage should DataSaveMode be processed

We will call checkOptions in SinkExecuteProcessor to check the config.

So I updated the starter code.

org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins

    @Override
    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
        List<URL> pluginJars = new ArrayList<>();
        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
            PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
            pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
            seaTunnelSink.prepare(sinkConfig);
            seaTunnelSink.setJobContext(jobContext);
            if (seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
                saveModeSink.checkOptions(sinkConfig);
            }
            return seaTunnelSink;
        }).distinct().collect(Collectors.toList());
        jarPaths.addAll(pluginJars);
        return sinks;
    }

About handleSaveMode method and automatic create table

Actually, it contains the semantics of creating schema in the whole life circle of data save mode, we can do the logic of automatic create table when invoke handleSaveMode.

But what the best time to invoke this logic? As we know, if we want to implement the logic of automatic create table we need know the acually schema of sink connector, so invoke it after SeaTunnelSink#setTypeInfo is a best choice like the following shown:

            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            if (seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
                SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
                DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
                saveModeSink.handleSaveMode(dataSaveMode);
            }

About how to getting or modify metadata

In api modules, it has the interface Catalog, this interface is used to communicate with datasource in connectors. So start it in this feature is a best choice.

Connector can implement theirs own Catalog and use it to do the semantics of schema management.

void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists);

related issue #3271

@EricJoy2048
Copy link
Member Author

Does createTable need use in handleSavePoint()?

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

@github-actions github-actions bot added the stale label Feb 16, 2023
@github-actions
Copy link

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant