From 1163f8dac97df8a915ae04e1041898073606429c Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sat, 29 Jun 2024 22:53:03 +0800 Subject: [PATCH 1/5] [improve][redis]using scan replace keys operation command --- docs/en/connector-v2/source/Redis.md | 5 +++ .../seatunnel/redis/config/RedisConfig.java | 9 ++++- .../seatunnel/redis/config/RedisDataType.java | 12 +++++++ .../redis/config/RedisParameters.java | 4 +++ .../redis/source/RedisSourceReader.java | 34 ++++++++++++++++--- 5 files changed, 59 insertions(+), 5 deletions(-) diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md index 3029f8061dd..521bac7be25 100644 --- a/docs/en/connector-v2/source/Redis.md +++ b/docs/en/connector-v2/source/Redis.md @@ -22,6 +22,7 @@ Used to read data from Redis. | host | string | yes | - | | port | int | yes | - | | keys | string | yes | - | +| scan_count | string | yes | 10 | | data_type | string | yes | - | | user | string | no | - | | auth | string | no | - | @@ -113,6 +114,10 @@ each kv that in hash key it will be treated as a row and send it to upstream. keys pattern +### scan_count [string] + +indicates the number of keys to attempt to return per iteration.default 10 + **Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type** ### data_type [string] diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index c8a0a02dc7c..073dbf8a8b0 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -83,7 +83,7 @@ public enum HashKeyParseMode { Options.key("data_type") .enumType(RedisDataType.class) .noDefaultValue() - .withDescription("redis data types, support key hash list set zset."); + .withDescription("redis data types, support string hash list set zset."); public static final Option FORMAT = Options.key("format") @@ -119,6 +119,13 @@ public enum HashKeyParseMode { .defaultValue(-1L) .withDescription("Set redis expiration time."); + public static final Option SCAN_COUNT = + Options.key("scan_count") + .intType() + .defaultValue(10) + .withDescription( + "Indicates the number of keys to attempt to return per iteration.default 10"); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java index a315e0cdae0..aac874254d2 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java @@ -40,6 +40,18 @@ public List get(Jedis jedis, String key) { return Collections.singletonList(jedis.get(key)); } }, + STRING { + @Override + public void set(Jedis jedis, String key, String value, long expire) { + jedis.set(key, value); + expire(jedis, key, expire); + } + + @Override + public List get(Jedis jedis, String key) { + return Collections.singletonList(jedis.get(key)); + } + }, HASH { @Override public void set(Jedis jedis, String key, String value, long expire) { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index d32f3bbb0de..7a14748b509 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -49,6 +49,8 @@ public class RedisParameters implements Serializable { private List redisNodes = Collections.emptyList(); private long expire = RedisConfig.EXPIRE.defaultValue(); + private int scanCount = RedisConfig.SCAN_COUNT.defaultValue(); + public void buildWithConfig(ReadonlyConfig config) { // set host this.host = config.get(RedisConfig.HOST); @@ -84,6 +86,8 @@ public void buildWithConfig(ReadonlyConfig config) { } // set redis data type verification factory createAndPrepareSource this.redisDataType = config.get(RedisConfig.DATA_TYPE); + // Indicates the number of keys to attempt to return per iteration.default 10 + this.scanCount = config.get(RedisConfig.SCAN_COUNT); } public Jedis buildJedis() { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java index 7dcea693fa3..150d7bcda58 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java @@ -30,24 +30,30 @@ import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; import redis.clients.jedis.Jedis; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; public class RedisSourceReader extends AbstractSingleSplitReader { private final RedisParameters redisParameters; private final SingleSplitReaderContext context; private final DeserializationSchema deserializationSchema; private Jedis jedis; + private final ScanParams scanParams; + + private final String scanType; public RedisSourceReader( RedisParameters redisParameters, SingleSplitReaderContext context, DeserializationSchema deserializationSchema) { this.redisParameters = redisParameters; + this.scanParams = buildScanParams(redisParameters); + this.scanType = resolveScanType(redisParameters.getRedisDataType()); this.context = context; this.deserializationSchema = deserializationSchema; } @@ -66,10 +72,16 @@ public void close() throws IOException { @Override public void internalPollNext(Collector output) throws Exception { - Set keys = jedis.keys(redisParameters.getKeysPattern()); RedisDataType redisDataType = redisParameters.getRedisDataType(); - for (String key : keys) { - List values = redisDataType.get(jedis, key); + String cursor = ScanParams.SCAN_POINTER_START; + while (true) { + ScanResult scanResult = jedis.scan(cursor, scanParams, scanType); + cursor = scanResult.getCursor(); + // when cursor return "0", scan end + if (ScanParams.SCAN_POINTER_START.equals(cursor)) { + break; + } + List values = scanResult.getResult(); for (String value : values) { if (deserializationSchema == null) { output.collect(new SeaTunnelRow(new Object[] {value})); @@ -96,4 +108,18 @@ public void internalPollNext(Collector output) throws Exception { } context.signalNoMoreElement(); } + + private ScanParams buildScanParams(RedisParameters redisParameters) { + ScanParams params = new ScanParams(); + params.count(redisParameters.getScanCount()); + params.match(redisParameters.getKeysPattern()); + return params; + } + + private String resolveScanType(RedisDataType redisDataType) { + if (RedisDataType.KEY.equals(redisDataType)) { + return "STRING"; + } + return redisDataType.name(); + } } From d80cf030177189ea932c48be8496b18428e9f5c3 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sun, 30 Jun 2024 00:49:51 +0800 Subject: [PATCH 2/5] [improve][redis]issue#7030 when scan cursor return 0,maybe current scan has values --- .../seatunnel/redis/source/RedisSourceReader.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java index 150d7bcda58..f4d392dd824 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java @@ -77,11 +77,10 @@ public void internalPollNext(Collector output) throws Exception { while (true) { ScanResult scanResult = jedis.scan(cursor, scanParams, scanType); cursor = scanResult.getCursor(); - // when cursor return "0", scan end - if (ScanParams.SCAN_POINTER_START.equals(cursor)) { + List values = scanResult.getResult(); + if (values == null || values.isEmpty()) { break; } - List values = scanResult.getResult(); for (String value : values) { if (deserializationSchema == null) { output.collect(new SeaTunnelRow(new Object[] {value})); @@ -105,6 +104,10 @@ public void internalPollNext(Collector output) throws Exception { } } } + // when cursor return "0", scan end + if (ScanParams.SCAN_POINTER_START.equals(cursor)) { + break; + } } context.signalNoMoreElement(); } From 9d858dd34c47faf3e7f7d5ef49f6bec061c429c6 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sun, 30 Jun 2024 20:25:38 +0800 Subject: [PATCH 3/5] [improve][redis]issue#7030,issue#7085,redis reader writer support batch --- docs/en/connector-v2/sink/Redis.md | 9 +- docs/en/connector-v2/source/Redis.md | 4 +- release-note.md | 1 + .../seatunnel/redis/client/RedisClient.java | 77 +++++++ .../redis/client/RedisClusterClient.java | 138 +++++++++++ .../redis/client/RedisSingleClient.java | 216 ++++++++++++++++++ .../seatunnel/redis/config/RedisConfig.java | 7 +- .../redis/config/RedisParameters.java | 17 +- .../seatunnel/redis/sink/RedisSinkWriter.java | 65 +++++- .../redis/source/RedisSourceReader.java | 163 +++++++++---- .../e2e/connector/redis/RedisIT.java | 115 ++++++++++ .../resources/redis-to-redis-by-db-num.conf | 2 + .../test/resources/redis-to-redis-expire.conf | 1 + .../src/test/resources/redis-to-redis.conf | 4 +- .../scan-hash-to-redis-list-hash-check.conf | 51 +++++ ...st-test-read-to-redis-list-test-check.conf | 51 +++++ .../scan-set-to-redis-list-set-check.conf | 51 +++++ .../test/resources/scan-string-to-redis.conf | 51 +++++ .../scan-zset-to-redis-list-zset-check.conf | 51 +++++ 19 files changed, 1005 insertions(+), 69 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java create mode 100644 seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java create mode 100644 seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index f91e6bc6ec0..e223a28b71f 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -18,6 +18,7 @@ Used to write data to Redis. | port | int | yes | - | | key | string | yes | - | | data_type | string | yes | - | +| batch_size | int | no | 10 | | user | string | no | - | | auth | string | no | - | | db_num | int | no | 0 | @@ -83,8 +84,12 @@ Redis data types, support `key` `hash` `list` `set` `zset` - zset > Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption. - -### user [string] +> +> ### batch_size [int] +> +> Ensure the batch write size in single-machine mode; no guarantees in cluster mode. +> + ### user [string] redis authentication user, you need it when you connect to an encrypted cluster diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md index 521bac7be25..a430b891681 100644 --- a/docs/en/connector-v2/source/Redis.md +++ b/docs/en/connector-v2/source/Redis.md @@ -22,7 +22,7 @@ Used to read data from Redis. | host | string | yes | - | | port | int | yes | - | | keys | string | yes | - | -| scan_count | string | yes | 10 | +| batch_size | int | yes | 10 | | data_type | string | yes | - | | user | string | no | - | | auth | string | no | - | @@ -114,7 +114,7 @@ each kv that in hash key it will be treated as a row and send it to upstream. keys pattern -### scan_count [string] +### batch_size [int] indicates the number of keys to attempt to return per iteration.default 10 diff --git a/release-note.md b/release-note.md index 24455c40acd..0b5e8845729 100644 --- a/release-note.md +++ b/release-note.md @@ -56,6 +56,7 @@ - [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy - [Connector-v2] [File] Support assign encoding for file source/sink (#5973) - [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997) +- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085) ### Zeta(ST-Engine) diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java new file mode 100644 index 00000000000..5730838ccae --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.client; + +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.params.ScanParams; +import redis.clients.jedis.resps.ScanResult; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class RedisClient extends Jedis { + + protected final RedisParameters redisParameters; + + protected final int batchSize; + + protected final Jedis jedis; + + protected RedisClient(RedisParameters redisParameters, Jedis jedis) { + this.redisParameters = redisParameters; + this.batchSize = redisParameters.getBatchSize(); + this.jedis = jedis; + } + + public ScanResult scanKeys( + String cursor, int batchSize, String keysPattern, RedisDataType type) { + ScanParams scanParams = new ScanParams(); + scanParams.match(keysPattern); + scanParams.count(batchSize); + return jedis.scan(cursor, scanParams, type.name()); + } + + public abstract List batchGetString(List keys); + + public abstract List> batchGetList(List keys); + + public abstract List> batchGetSet(List keys); + + public abstract List> batchGetHash(List keys); + + public abstract List> batchGetZset(List keys); + + public abstract void batchWriteString( + List keys, List values, long expireSeconds); + + public abstract void batchWriteList( + List keyBuffer, List valueBuffer, long expireSeconds); + + public abstract void batchWriteSet( + List keyBuffer, List valueBuffer, long expireSeconds); + + public abstract void batchWriteHash( + List keyBuffer, List valueBuffer, long expireSeconds); + + public abstract void batchWriteZset( + List keyBuffer, List valueBuffer, long expireSeconds); +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java new file mode 100644 index 00000000000..13acc89defe --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.client; + +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; + +import org.apache.commons.collections4.CollectionUtils; + +import redis.clients.jedis.Jedis; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class RedisClusterClient extends RedisClient { + public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) { + super(redisParameters, jedis); + } + + @Override + public List batchGetString(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List result = new ArrayList<>(keys.size()); + for (String key : keys) { + result.add(jedis.get(key)); + } + return result; + } + + @Override + public List> batchGetList(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List> result = new ArrayList<>(keys.size()); + for (String key : keys) { + result.add(jedis.lrange(key, 0, -1)); + } + return result; + } + + @Override + public List> batchGetSet(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List> result = new ArrayList<>(keys.size()); + for (String key : keys) { + result.add(jedis.smembers(key)); + } + return result; + } + + @Override + public List> batchGetHash(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List> result = new ArrayList<>(keys.size()); + for (String key : keys) { + Map map = jedis.hgetAll(key); + map.put("hash_key", key); + result.add(map); + } + return result; + } + + @Override + public List> batchGetZset(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List> result = new ArrayList<>(keys.size()); + for (String key : keys) { + result.add(jedis.zrange(key, 0, -1)); + } + return result; + } + + @Override + public void batchWriteString(List keys, List values, long expireSeconds) { + int size = keys.size(); + for (int i = 0; i < size; i++) { + RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds); + } + } + + @Override + public void batchWriteList(List keys, List values, long expireSeconds) { + int size = keys.size(); + for (int i = 0; i < size; i++) { + RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds); + } + } + + @Override + public void batchWriteSet(List keys, List values, long expireSeconds) { + int size = keys.size(); + for (int i = 0; i < size; i++) { + RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds); + } + } + + @Override + public void batchWriteHash(List keys, List values, long expireSeconds) { + int size = keys.size(); + for (int i = 0; i < size; i++) { + RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds); + } + } + + @Override + public void batchWriteZset(List keys, List values, long expireSeconds) { + int size = keys.size(); + for (int i = 0; i < size; i++) { + RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds); + } + } +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java new file mode 100644 index 00000000000..99bae5e7333 --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.client; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; + +import org.apache.commons.collections4.CollectionUtils; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +// In standalone mode, pipeline can be used to improve batch read performance +public class RedisSingleClient extends RedisClient { + + public RedisSingleClient(RedisParameters redisParameters, Jedis jedis) { + super(redisParameters, jedis); + } + + @Override + public List batchGetString(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + String[] keyArr = keys.toArray(new String[0]); + return jedis.mget(keyArr); + } + + @Override + public List> batchGetList(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + Pipeline pipeline = jedis.pipelined(); + List>> responses = new ArrayList<>(keys.size()); + + for (String key : keys) { + responses.add(pipeline.lrange(key, 0, -1)); + } + + pipeline.sync(); + + List> resultList = new ArrayList<>(keys.size()); + for (Response> response : responses) { + resultList.add(response.get()); + } + + return resultList; + } + + @Override + public List> batchGetSet(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + Pipeline pipeline = jedis.pipelined(); + List>> responses = new ArrayList<>(keys.size()); + + for (String key : keys) { + responses.add(pipeline.smembers(key)); + } + + pipeline.sync(); + + List> resultList = new ArrayList<>(keys.size()); + for (Response> response : responses) { + resultList.add(response.get()); + } + + return resultList; + } + + @Override + public List> batchGetHash(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + Pipeline pipeline = jedis.pipelined(); + List>> responses = new ArrayList<>(keys.size()); + + for (String key : keys) { + Response> response = pipeline.hgetAll(key); + responses.add(response); + } + + pipeline.sync(); + + List> resultList = new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + Response> response = responses.get(i); + Map map = response.get(); + if (map != null) { + map.put("hash_key", keys.get(i)); + } + resultList.add(map); + } + + return resultList; + } + + @Override + public List> batchGetZset(List keys) { + if (CollectionUtils.isEmpty(keys)) { + return new ArrayList<>(); + } + List>> responses = new ArrayList<>(keys.size()); + Pipeline pipelined = jedis.pipelined(); + for (String key : keys) { + Response> response = pipelined.zrange(key, 0, -1); + responses.add(response); + } + pipelined.sync(); + List> resultlist = new ArrayList<>(keys.size()); + for (Response> response : responses) { + resultlist.add(response.get()); + } + return resultlist; + } + + @Override + public void batchWriteString(List keys, List values, long expireSeconds) { + Pipeline pipelined = jedis.pipelined(); + int size = keys.size(); + for (int i = 0; i < size; i++) { + String key = keys.get(i); + String value = values.get(i); + pipelined.set(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } + } + pipelined.sync(); + } + + @Override + public void batchWriteList(List keys, List values, long expireSeconds) { + Pipeline pipelined = jedis.pipelined(); + int size = keys.size(); + for (int i = 0; i < size; i++) { + String key = keys.get(i); + String value = values.get(i); + pipelined.lpush(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } + } + pipelined.sync(); + } + + @Override + public void batchWriteSet(List keys, List values, long expireSeconds) { + Pipeline pipelined = jedis.pipelined(); + int size = keys.size(); + for (int i = 0; i < size; i++) { + String key = keys.get(i); + String value = values.get(i); + pipelined.sadd(key, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } + } + pipelined.sync(); + } + + @Override + public void batchWriteHash(List keys, List values, long expireSeconds) { + Pipeline pipelined = jedis.pipelined(); + int size = keys.size(); + for (int i = 0; i < size; i++) { + String key = keys.get(i); + String value = values.get(i); + Map fieldsMap = JsonUtils.toMap(value); + pipelined.hset(key, fieldsMap); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } + } + pipelined.sync(); + } + + @Override + public void batchWriteZset(List keys, List values, long expireSeconds) { + Pipeline pipelined = jedis.pipelined(); + int size = keys.size(); + for (int i = 0; i < size; i++) { + String key = keys.get(i); + String value = values.get(i); + pipelined.zadd(key, 1, value); + if (expireSeconds > 0) { + pipelined.expire(key, expireSeconds); + } + } + pipelined.sync(); + } +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index 073dbf8a8b0..3be5b39de99 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -119,12 +119,13 @@ public enum HashKeyParseMode { .defaultValue(-1L) .withDescription("Set redis expiration time."); - public static final Option SCAN_COUNT = - Options.key("scan_count") + public static final Option BATCH_SIZE = + Options.key("batch_size") .intType() .defaultValue(10) .withDescription( - "Indicates the number of keys to attempt to return per iteration.default 10"); + "batch_size is used to control the size of a batch of data during read and write operations" + + ",default 10"); public enum Format { JSON, diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 7a14748b509..ef1fe104d7a 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; +import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient; +import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient; import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.commons.lang3.StringUtils; @@ -48,8 +51,7 @@ public class RedisParameters implements Serializable { private RedisConfig.HashKeyParseMode hashKeyParseMode; private List redisNodes = Collections.emptyList(); private long expire = RedisConfig.EXPIRE.defaultValue(); - - private int scanCount = RedisConfig.SCAN_COUNT.defaultValue(); + private int batchSize = RedisConfig.BATCH_SIZE.defaultValue(); public void buildWithConfig(ReadonlyConfig config) { // set host @@ -87,7 +89,16 @@ public void buildWithConfig(ReadonlyConfig config) { // set redis data type verification factory createAndPrepareSource this.redisDataType = config.get(RedisConfig.DATA_TYPE); // Indicates the number of keys to attempt to return per iteration.default 10 - this.scanCount = config.get(RedisConfig.SCAN_COUNT); + this.batchSize = config.get(RedisConfig.BATCH_SIZE); + } + + public RedisClient buildRedisClient() { + Jedis jedis = this.buildJedis(); + if (mode.equals(RedisConfig.RedisMode.SINGLE)) { + return new RedisSingleClient(this, jedis); + } else { + return new RedisClusterClient(this, jedis); + } } public Jedis buildJedis() { diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 23eda572029..f03c5c48c88 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -21,24 +21,30 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; +import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; import org.apache.seatunnel.format.json.JsonSerializationSchema; -import redis.clients.jedis.Jedis; - import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; public class RedisSinkWriter extends AbstractSinkWriter implements SupportMultiTableSinkWriter { private final SeaTunnelRowType seaTunnelRowType; private final RedisParameters redisParameters; private final SerializationSchema serializationSchema; - private final Jedis jedis; + private final RedisClient redisClient; + + private final int batchSize; + + private final List keyBuffer; + private final List valueBuffer; public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisParameters) { this.seaTunnelRowType = seaTunnelRowType; @@ -46,13 +52,15 @@ public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisP // TODO according to format to initialize serializationSchema // Now temporary using json serializationSchema this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType); - this.jedis = redisParameters.buildJedis(); + this.redisClient = redisParameters.buildRedisClient(); + this.batchSize = redisParameters.getBatchSize(); + this.keyBuffer = new ArrayList<>(batchSize); + this.valueBuffer = new ArrayList<>(batchSize); } @Override public void write(SeaTunnelRow element) throws IOException { String data = new String(serializationSchema.serialize(element)); - RedisDataType redisDataType = redisParameters.getRedisDataType(); String keyField = redisParameters.getKeyField(); List fields = Arrays.asList(seaTunnelRowType.getFieldNames()); String key; @@ -61,14 +69,51 @@ public void write(SeaTunnelRow element) throws IOException { } else { key = keyField; } - long expire = redisParameters.getExpire(); - redisDataType.set(jedis, key, data, expire); + keyBuffer.add(key); + valueBuffer.add(data); + if (keyBuffer.size() >= batchSize) { + doBatchWrite(); + clearBuffer(); + } + } + + private void clearBuffer() { + keyBuffer.clear(); + valueBuffer.clear(); + } + + private void doBatchWrite() { + RedisDataType redisDataType = redisParameters.getRedisDataType(); + if (RedisDataType.KEY.equals(redisDataType) || RedisDataType.STRING.equals(redisDataType)) { + redisClient.batchWriteString(keyBuffer, valueBuffer, redisParameters.getExpire()); + return; + } + if (RedisDataType.LIST.equals(redisDataType)) { + redisClient.batchWriteList(keyBuffer, valueBuffer, redisParameters.getExpire()); + return; + } + if (RedisDataType.SET.equals(redisDataType)) { + redisClient.batchWriteSet(keyBuffer, valueBuffer, redisParameters.getExpire()); + return; + } + if (RedisDataType.HASH.equals(redisDataType)) { + redisClient.batchWriteHash(keyBuffer, valueBuffer, redisParameters.getExpire()); + return; + } + if (RedisDataType.ZSET.equals(redisDataType)) { + redisClient.batchWriteZset(keyBuffer, valueBuffer, redisParameters.getExpire()); + return; + } + throw new RedisConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "UnSupport redisDataType,only support string,list,hash,set,zset"); } @Override public void close() throws IOException { - if (Objects.nonNull(jedis)) { - jedis.close(); + if (!keyBuffer.isEmpty()) { + doBatchWrite(); + clearBuffer(); } } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java index f4d392dd824..10825be8bc6 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java @@ -19,17 +19,19 @@ import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; +import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException; + +import org.apache.commons.collections4.CollectionUtils; -import redis.clients.jedis.Jedis; import redis.clients.jedis.params.ScanParams; import redis.clients.jedis.resps.ScanResult; @@ -37,73 +39,51 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; public class RedisSourceReader extends AbstractSingleSplitReader { private final RedisParameters redisParameters; private final SingleSplitReaderContext context; private final DeserializationSchema deserializationSchema; - private Jedis jedis; - private final ScanParams scanParams; - - private final String scanType; + private RedisClient redisClient; public RedisSourceReader( RedisParameters redisParameters, SingleSplitReaderContext context, DeserializationSchema deserializationSchema) { this.redisParameters = redisParameters; - this.scanParams = buildScanParams(redisParameters); - this.scanType = resolveScanType(redisParameters.getRedisDataType()); this.context = context; this.deserializationSchema = deserializationSchema; } @Override public void open() throws Exception { - this.jedis = redisParameters.buildJedis(); + this.redisClient = redisParameters.buildRedisClient(); } @Override public void close() throws IOException { - if (Objects.nonNull(jedis)) { - jedis.close(); + if (Objects.nonNull(redisClient)) { + redisClient.close(); } } @Override public void internalPollNext(Collector output) throws Exception { - RedisDataType redisDataType = redisParameters.getRedisDataType(); + RedisDataType redisDataType = resolveScanType(redisParameters.getRedisDataType()); String cursor = ScanParams.SCAN_POINTER_START; + String keysPattern = redisParameters.getKeysPattern(); + int batchSize = redisParameters.getBatchSize(); while (true) { - ScanResult scanResult = jedis.scan(cursor, scanParams, scanType); + // String cursor, int batchSize, String keysPattern, RedisType type + ScanResult scanResult = + redisClient.scanKeys(cursor, batchSize, keysPattern, redisDataType); cursor = scanResult.getCursor(); - List values = scanResult.getResult(); - if (values == null || values.isEmpty()) { + List keys = scanResult.getResult(); + if (CollectionUtils.isEmpty(keys)) { break; } - for (String value : values) { - if (deserializationSchema == null) { - output.collect(new SeaTunnelRow(new Object[] {value})); - } else { - if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV - && redisDataType == RedisDataType.HASH) { - // Treat each key-value pair in the hash-key as one piece of data - Map recordsMap = JsonUtils.toMap(value); - for (Map.Entry entry : recordsMap.entrySet()) { - String k = entry.getKey(); - String v = entry.getValue(); - Map valuesMap = JsonUtils.toMap(v); - SeaTunnelDataType seaTunnelRowType = - deserializationSchema.getProducedType(); - valuesMap.put(((SeaTunnelRowType) seaTunnelRowType).getFieldName(0), k); - deserializationSchema.deserialize( - JsonUtils.toJsonString(valuesMap).getBytes(), output); - } - } else { - deserializationSchema.deserialize(value.getBytes(), output); - } - } - } + pollNext(keys, redisDataType, output); // when cursor return "0", scan end if (ScanParams.SCAN_POINTER_START.equals(cursor)) { break; @@ -112,17 +92,104 @@ public void internalPollNext(Collector output) throws Exception { context.signalNoMoreElement(); } - private ScanParams buildScanParams(RedisParameters redisParameters) { - ScanParams params = new ScanParams(); - params.count(redisParameters.getScanCount()); - params.match(redisParameters.getKeysPattern()); - return params; + private void pollNext(List keys, RedisDataType dataType, Collector output) + throws IOException { + if (RedisDataType.HASH.equals(dataType)) { + pollHashMapToNext(keys, output); + return; + } + if (RedisDataType.STRING.equals(dataType) || RedisDataType.KEY.equals(dataType)) { + pollStringToNext(keys, output); + return; + } + if (RedisDataType.LIST.equals(dataType)) { + pollListToNext(keys, output); + return; + } + if (RedisDataType.SET.equals(dataType)) { + pollSetToNext(keys, output); + return; + } + if (RedisDataType.ZSET.equals(dataType)) { + pollZsetToNext(keys, output); + return; + } + throw new RedisConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "UnSupport redisDataType,only support string,list,hash,set,zset"); + } + + private void pollZsetToNext(List keys, Collector output) + throws IOException { + List> zSetList = redisClient.batchGetZset(keys); + for (List values : zSetList) { + for (String value : values) { + pollValueToNext(value, output); + } + } + } + + private void pollSetToNext(List keys, Collector output) + throws IOException { + List> setList = redisClient.batchGetSet(keys); + for (Set values : setList) { + for (String value : values) { + pollValueToNext(value, output); + } + } + } + + private void pollListToNext(List keys, Collector output) + throws IOException { + List> valueList = redisClient.batchGetList(keys); + for (List values : valueList) { + for (String value : values) { + pollValueToNext(value, output); + } + } + } + + private void pollStringToNext(List keys, Collector output) + throws IOException { + List values = redisClient.batchGetString(keys); + for (String value : values) { + pollValueToNext(value, output); + } + } + + private void pollValueToNext(String value, Collector output) throws IOException { + if (deserializationSchema == null) { + output.collect(new SeaTunnelRow(new Object[] {value})); + } else { + deserializationSchema.deserialize(value.getBytes(), output); + } + } + + private void pollHashMapToNext(List keys, Collector output) + throws IOException { + List> values = redisClient.batchGetHash(keys); + if (deserializationSchema == null) { + for (Map value : values) { + output.collect(new SeaTunnelRow(new Object[] {JsonUtils.toJsonString(value)})); + } + return; + } + for (Map recordsMap : values) { + if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV) { + deserializationSchema.deserialize( + JsonUtils.toJsonString(recordsMap).getBytes(), output); + } else { + SeaTunnelRow seaTunnelRow = + new SeaTunnelRow(new Object[] {JsonUtils.toJsonString(recordsMap)}); + output.collect(seaTunnelRow); + } + } } - private String resolveScanType(RedisDataType redisDataType) { - if (RedisDataType.KEY.equals(redisDataType)) { - return "STRING"; + private RedisDataType resolveScanType(RedisDataType dataType) { + if (RedisDataType.KEY.equals(dataType)) { + return RedisDataType.STRING; } - return redisDataType.name(); + return dataType; } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 7b03818c0bf..97d8b5777b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -48,6 +48,7 @@ import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; import java.io.IOException; import java.math.BigDecimal; @@ -56,7 +57,9 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Stream; @@ -223,6 +226,118 @@ public void testRedisDbNum(TestContainer container) throws IOException, Interrup jedis.select(0); } + @TestTemplate + public void testScanStringTypeWriteRedis(TestContainer container) + throws IOException, InterruptedException { + String keyPrefix = "string_test"; + for (int i = 0; i < 1000; i++) { + jedis.set(keyPrefix + i, "val"); + } + Container.ExecResult execResult = container.executeJob("/scan-string-to-redis.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("string_test_list", 0, -1); + Assertions.assertEquals(1000, list.size()); + jedis.del("string_test_list"); + for (int i = 0; i < 1000; i++) { + jedis.del(keyPrefix + i); + } + } + + @TestTemplate + public void testScanListTypeWriteRedis(TestContainer container) + throws IOException, InterruptedException { + String keyPrefix = "list-test-read"; + for (int i = 0; i < 100; i++) { + String list = keyPrefix + i; + for (int j = 0; j < 10; j++) { + jedis.lpush(list, "val" + j); + } + } + Container.ExecResult execResult = + container.executeJob("/scan-list-test-read-to-redis-list-test-check.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("list-test-check", 0, -1); + Assertions.assertEquals(1000, list.size()); + jedis.del("list-test-check"); + for (int i = 0; i < 100; i++) { + String delKey = keyPrefix + i; + jedis.del(delKey); + } + } + + @TestTemplate + public void testScanSetTypeWriteRedis(TestContainer container) + throws IOException, InterruptedException { + String setKeyPrefix = "key-test-set"; + for (int i = 0; i < 100; i++) { + String setKey = setKeyPrefix + i; + for (int j = 0; j < 10; j++) { + jedis.sadd(setKey, j + ""); + } + } + Container.ExecResult execResult = + container.executeJob("/scan-set-to-redis-list-set-check.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("list-set-check", 0, -1); + Assertions.assertEquals(1000, list.size()); + jedis.del("list-set-check"); + for (int i = 0; i < 100; i++) { + String setKey = setKeyPrefix + i; + jedis.del(setKey); + } + } + + @TestTemplate + public void testScanHashTypeWriteRedis(TestContainer container) + throws IOException, InterruptedException { + String hashKeyPrefix = "key-test-hash"; + for (int i = 0; i < 100; i++) { + String setKey = hashKeyPrefix + i; + Map map = new HashMap<>(); + map.put("name", "fuyoujie"); + jedis.hset(setKey, map); + } + Container.ExecResult execResult = + container.executeJob("/scan-hash-to-redis-list-hash-check.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("list-hash-check", 0, -1); + Assertions.assertEquals(100, list.size()); + jedis.del("list-hash-check"); + Pipeline pipelined = jedis.pipelined(); + for (int i = 0; i < 100; i++) { + String hashKey = hashKeyPrefix + i; + pipelined.del(hashKey); + } + for (int i = 0; i < 100; i++) { + String hashKey = hashKeyPrefix + i; + for (int j = 0; j < 10; j++) { + jedis.del(hashKey); + } + } + } + + @TestTemplate + public void testScanZsetTypeWriteRedis(TestContainer container) + throws IOException, InterruptedException { + String zSetKeyPrefix = "key-test-zset"; + for (int i = 0; i < 100; i++) { + String key = zSetKeyPrefix + i; + for (int j = 0; j < 10; j++) { + jedis.zadd(key, 1, j + ""); + } + } + Container.ExecResult execResult = + container.executeJob("/scan-zset-to-redis-list-zset-check.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + List list = jedis.lrange("list-zset-check", 0, -1); + Assertions.assertEquals(1000, list.size()); + jedis.del("list-zset-check"); + for (int i = 0; i < 100; i++) { + String key = zSetKeyPrefix + i; + jedis.del(key); + } + } + @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf index 5b46c141719..9c1dec0c367 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf @@ -36,6 +36,7 @@ source { keys = "key_test*" data_type = key db_num=1 + batch_size = 33 } } @@ -47,5 +48,6 @@ sink { key = "db_test" data_type = list db_num=2 + batch_size = 33 } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf index cd367ad872a..133773d799d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf @@ -35,6 +35,7 @@ source { auth = "U2VhVHVubmVs" keys = "key_test*" data_type = key + batch_size = 33 } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf index 17c5de5489c..20fee261f00 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf @@ -34,7 +34,8 @@ source { port = 6379 auth = "U2VhVHVubmVs" keys = "key_test*" - data_type = key + data_type = string + batch_size = 33 } } @@ -45,5 +46,6 @@ sink { auth = "U2VhVHVubmVs" key = "key_list" data_type = list + batch_size = 33 } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf new file mode 100644 index 00000000000..ee76a0a8391 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-hash-to-redis-list-hash-check.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key-test-hash*" + data_type = hash + batch_size = 33 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list-hash-check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf new file mode 100644 index 00000000000..62db2eaa166 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-list-test-read-to-redis-list-test-check.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "list-test-read*" + data_type = list + batch_size = 33 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list-test-check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf new file mode 100644 index 00000000000..4c64224863f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-set-to-redis-list-set-check.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key-test-set*" + data_type = set + batch_size = 33 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list-set-check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf new file mode 100644 index 00000000000..317e603978e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-string-to-redis.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "string_test*" + data_type = string + batch_size = 33 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "string_test_list" + data_type = list + batch_size = 33 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf new file mode 100644 index 00000000000..f7d2c05d37a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/scan-zset-to-redis-list-zset-check.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key-test-zset*" + data_type = zset + batch_size = 33 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "list-zset-check" + data_type = list + batch_size = 33 + } +} \ No newline at end of file From 6f9a93f075d56eb357383fef966026a34f19f886 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Sun, 30 Jun 2024 21:07:33 +0800 Subject: [PATCH 4/5] [improve][redis]issue#7030,issue#7085,redis reader writer support batch --- .../org/apache/seatunnel/e2e/connector/redis/RedisIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 97d8b5777b2..0b9ed1bedbd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -48,7 +48,6 @@ import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; import java.io.IOException; import java.math.BigDecimal; @@ -303,10 +302,9 @@ public void testScanHashTypeWriteRedis(TestContainer container) List list = jedis.lrange("list-hash-check", 0, -1); Assertions.assertEquals(100, list.size()); jedis.del("list-hash-check"); - Pipeline pipelined = jedis.pipelined(); for (int i = 0; i < 100; i++) { String hashKey = hashKeyPrefix + i; - pipelined.del(hashKey); + jedis.del(hashKey); } for (int i = 0; i < 100; i++) { String hashKey = hashKeyPrefix + i; From 632319f7681e08753531a1eaeef2534edeac7d56 Mon Sep 17 00:00:00 2001 From: FuYouJ <1247908487@qq.com> Date: Mon, 1 Jul 2024 22:06:55 +0800 Subject: [PATCH 5/5] [improve][redis]update docs --- docs/en/connector-v2/sink/Redis.md | 10 +++++----- docs/en/connector-v2/source/Redis.md | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index e223a28b71f..ac4cd55cc4f 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -85,11 +85,11 @@ Redis data types, support `key` `hash` `list` `set` `zset` > Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption. > -> ### batch_size [int] -> -> Ensure the batch write size in single-machine mode; no guarantees in cluster mode. -> - ### user [string] + ### batch_size [int] + +ensure the batch write size in single-machine mode; no guarantees in cluster mode. + +### user [string] redis authentication user, you need it when you connect to an encrypted cluster diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md index a430b891681..9af103f8841 100644 --- a/docs/en/connector-v2/source/Redis.md +++ b/docs/en/connector-v2/source/Redis.md @@ -116,7 +116,7 @@ keys pattern ### batch_size [int] -indicates the number of keys to attempt to return per iteration.default 10 +indicates the number of keys to attempt to return per iteration,default 10 **Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type**