Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] update Redis connector config option #8631

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private Set<String> buildWhiteList() {
whiteList.add("PaimonSinkOptions");
whiteList.add("TDengineSourceOptions");
whiteList.add("PulsarSourceOptions");
whiteList.add("RedisSinkOptions");
whiteList.add("FakeSourceOptions");
whiteList.add("HbaseSinkOptions");
whiteList.add("MongodbSinkOptions");
Expand Down Expand Up @@ -231,7 +230,6 @@ private Set<String> buildWhiteList() {
whiteList.add("SocketSourceOptions");
whiteList.add("OpenMldbSourceOptions");
whiteList.add("Web3jSourceOptions");
whiteList.add("RedisSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.util.List;

public class RedisConfig {
public class RedisBaseOptions {

public static final String CONNECTOR_IDENTITY = "Redis";

Expand All @@ -31,11 +31,6 @@ public enum RedisMode {
CLUSTER;
}

public enum HashKeyParseMode {
ALL,
KV;
}

public static final Option<String> HOST =
Options.key("host")
.stringType()
Expand Down Expand Up @@ -85,16 +80,16 @@ public enum HashKeyParseMode {
.noDefaultValue()
.withDescription("redis data types, support string hash list set zset.");

public static final Option<RedisConfig.Format> FORMAT =
public static final Option<RedisBaseOptions.Format> FORMAT =
Options.key("format")
.enumType(RedisConfig.Format.class)
.defaultValue(RedisConfig.Format.JSON)
.enumType(RedisBaseOptions.Format.class)
.defaultValue(RedisBaseOptions.Format.JSON)
.withDescription(
"the format of upstream data, now only support json and text, default json.");

public static final Option<RedisConfig.RedisMode> MODE =
public static final Option<RedisBaseOptions.RedisMode> MODE =
Options.key("mode")
.enumType(RedisConfig.RedisMode.class)
.enumType(RedisBaseOptions.RedisMode.class)
.defaultValue(RedisMode.SINGLE)
.withDescription(
"redis mode, support single or cluster, default value is single");
Expand All @@ -106,19 +101,6 @@ public enum HashKeyParseMode {
.withDescription(
"redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]");

public static final Option<RedisConfig.HashKeyParseMode> HASH_KEY_PARSE_MODE =
Options.key("hash_key_parse_mode")
.enumType(RedisConfig.HashKeyParseMode.class)
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default value is all");

public static final Option<Long> EXPIRE =
Options.key("expire")
.longType()
.defaultValue(-1L)
.withDescription("Set redis expiration time.");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
Expand All @@ -127,32 +109,6 @@ public enum HashKeyParseMode {
"batch_size is used to control the size of a batch of data during read and write operations"
+ ",default 10");

public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
Options.key("support_custom_key")
.booleanType()
.defaultValue(false)
.withDescription(
"if true, the key can be customized by the field value in the upstream data.");

public static final Option<String> VALUE_FIELD =
Options.key("value_field")
.stringType()
.noDefaultValue()
.withDescription(
"The field of value you want to write to redis, support string list set zset");

public static final Option<String> HASH_KEY_FIELD =
Options.key("hash_key_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash key you want to write to redis");

public static final Option<String> HASH_VALUE_FIELD =
Options.key("hash_value_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash value you want to write to redis");

public enum Format {
JSON,
// TEXT will be supported later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public class RedisParameters implements Serializable {
private String keysPattern;
private String keyField;
private RedisDataType redisDataType;
private RedisConfig.RedisMode mode;
private RedisConfig.HashKeyParseMode hashKeyParseMode;
private RedisBaseOptions.RedisMode mode;
private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();
private int batchSize = RedisConfig.BATCH_SIZE.defaultValue();
private long expire = RedisSinkOptions.EXPIRE.defaultValue();
private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
private Boolean supportCustomKey;
private String valueField;
private String hashKeyField;
Expand All @@ -66,63 +66,63 @@ public class RedisParameters implements Serializable {

public void buildWithConfig(ReadonlyConfig config) {
// set host
this.host = config.get(RedisConfig.HOST);
this.host = config.get(RedisBaseOptions.HOST);
// set port
this.port = config.get(RedisConfig.PORT);
this.port = config.get(RedisBaseOptions.PORT);
// set db_num
this.dbNum = config.get(RedisConfig.DB_NUM);
this.dbNum = config.get(RedisBaseOptions.DB_NUM);
// set hash key mode
this.hashKeyParseMode = config.get(RedisConfig.HASH_KEY_PARSE_MODE);
this.hashKeyParseMode = config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
// set expire
this.expire = config.get(RedisConfig.EXPIRE);
this.expire = config.get(RedisSinkOptions.EXPIRE);
// set auth
if (config.getOptional(RedisConfig.AUTH).isPresent()) {
this.auth = config.get(RedisConfig.AUTH);
if (config.getOptional(RedisBaseOptions.AUTH).isPresent()) {
this.auth = config.get(RedisBaseOptions.AUTH);
}
// set user
if (config.getOptional(RedisConfig.USER).isPresent()) {
this.user = config.get(RedisConfig.USER);
if (config.getOptional(RedisBaseOptions.USER).isPresent()) {
this.user = config.get(RedisBaseOptions.USER);
}
// set mode
this.mode = config.get(RedisConfig.MODE);
this.mode = config.get(RedisBaseOptions.MODE);
// set redis nodes information
if (config.getOptional(RedisConfig.NODES).isPresent()) {
this.redisNodes = config.get(RedisConfig.NODES);
if (config.getOptional(RedisBaseOptions.NODES).isPresent()) {
this.redisNodes = config.get(RedisBaseOptions.NODES);
}
// set key
if (config.getOptional(RedisConfig.KEY).isPresent()) {
this.keyField = config.get(RedisConfig.KEY);
if (config.getOptional(RedisBaseOptions.KEY).isPresent()) {
this.keyField = config.get(RedisBaseOptions.KEY);
}
// set keysPattern
if (config.getOptional(RedisConfig.KEY_PATTERN).isPresent()) {
this.keysPattern = config.get(RedisConfig.KEY_PATTERN);
if (config.getOptional(RedisBaseOptions.KEY_PATTERN).isPresent()) {
this.keysPattern = config.get(RedisBaseOptions.KEY_PATTERN);
}
// set redis data type verification factory createAndPrepareSource
this.redisDataType = config.get(RedisConfig.DATA_TYPE);
this.redisDataType = config.get(RedisBaseOptions.DATA_TYPE);
// Indicates the number of keys to attempt to return per iteration.default 10
this.batchSize = config.get(RedisConfig.BATCH_SIZE);
this.batchSize = config.get(RedisBaseOptions.BATCH_SIZE);
// set support custom key
if (config.getOptional(RedisConfig.SUPPORT_CUSTOM_KEY).isPresent()) {
this.supportCustomKey = config.get(RedisConfig.SUPPORT_CUSTOM_KEY);
if (config.getOptional(RedisSinkOptions.SUPPORT_CUSTOM_KEY).isPresent()) {
this.supportCustomKey = config.get(RedisSinkOptions.SUPPORT_CUSTOM_KEY);
}
// set value field
if (config.getOptional(RedisConfig.VALUE_FIELD).isPresent()) {
this.valueField = config.get(RedisConfig.VALUE_FIELD);
if (config.getOptional(RedisSinkOptions.VALUE_FIELD).isPresent()) {
this.valueField = config.get(RedisSinkOptions.VALUE_FIELD);
}
// set hash key field
if (config.getOptional(RedisConfig.HASH_KEY_FIELD).isPresent()) {
this.hashKeyField = config.get(RedisConfig.HASH_KEY_FIELD);
if (config.getOptional(RedisSinkOptions.HASH_KEY_FIELD).isPresent()) {
this.hashKeyField = config.get(RedisSinkOptions.HASH_KEY_FIELD);
}
// set hash value field
if (config.getOptional(RedisConfig.HASH_VALUE_FIELD).isPresent()) {
this.hashValueField = config.get(RedisConfig.HASH_VALUE_FIELD);
if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent()) {
this.hashValueField = config.get(RedisSinkOptions.HASH_VALUE_FIELD);
}
}

public RedisClient buildRedisClient() {
Jedis jedis = this.buildJedis();
this.redisVersion = extractRedisVersion(jedis);
if (mode.equals(RedisConfig.RedisMode.SINGLE)) {
if (mode.equals(RedisBaseOptions.RedisMode.SINGLE)) {
return new RedisSingleClient(this, jedis, redisVersion);
} else {
return new RedisClusterClient(this, jedis, redisVersion);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class RedisSinkOptions extends RedisBaseOptions {

public static final Option<Long> EXPIRE =
Options.key("expire")
.longType()
.defaultValue(-1L)
.withDescription("Set redis expiration time.");

public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
Options.key("support_custom_key")
.booleanType()
.defaultValue(false)
.withDescription(
"if true, the key can be customized by the field value in the upstream data.");
public static final Option<String> VALUE_FIELD =
Options.key("value_field")
.stringType()
.noDefaultValue()
.withDescription(
"The field of value you want to write to redis, support string list set zset");
public static final Option<String> HASH_KEY_FIELD =
Options.key("hash_key_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash key you want to write to redis");

public static final Option<String> HASH_VALUE_FIELD =
Options.key("hash_value_field")
.stringType()
.noDefaultValue()
.withDescription("The field of hash value you want to write to redis");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class RedisSourceOptions extends RedisBaseOptions {
public enum HashKeyParseMode {
ALL,
KV;
}

public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
Options.key("hash_key_parse_mode")
.enumType(HashKeyParseMode.class)
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default value is all");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

import java.io.IOException;
Expand All @@ -46,7 +46,7 @@ public RedisSink(ReadonlyConfig config, CatalogTable table) {

@Override
public String getPluginName() {
return RedisConfig.CONNECTOR_IDENTITY;
return RedisBaseOptions.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSinkOptions;

import com.google.auto.service.AutoService;

Expand All @@ -45,20 +46,26 @@ public TableSink createSink(TableSinkFactoryContext context) {
public OptionRule optionRule() {
return OptionRule.builder()
.required(
RedisConfig.HOST, RedisConfig.PORT, RedisConfig.KEY, RedisConfig.DATA_TYPE)
RedisBaseOptions.HOST,
RedisBaseOptions.PORT,
RedisBaseOptions.KEY,
RedisBaseOptions.DATA_TYPE)
.optional(
RedisConfig.MODE,
RedisConfig.AUTH,
RedisConfig.USER,
RedisConfig.KEY_PATTERN,
RedisConfig.FORMAT,
RedisConfig.EXPIRE,
RedisConfig.SUPPORT_CUSTOM_KEY,
RedisConfig.VALUE_FIELD,
RedisConfig.HASH_KEY_FIELD,
RedisConfig.HASH_VALUE_FIELD,
RedisBaseOptions.MODE,
RedisBaseOptions.AUTH,
RedisBaseOptions.USER,
RedisBaseOptions.KEY_PATTERN,
RedisBaseOptions.FORMAT,
RedisSinkOptions.EXPIRE,
RedisSinkOptions.SUPPORT_CUSTOM_KEY,
RedisSinkOptions.VALUE_FIELD,
RedisSinkOptions.HASH_KEY_FIELD,
RedisSinkOptions.HASH_VALUE_FIELD,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES)
.conditional(
RedisBaseOptions.MODE,
RedisBaseOptions.RedisMode.CLUSTER,
RedisBaseOptions.NODES)
.build();
}
}
Loading