Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Redis] Redis reader use scan cammnd instead of keys, single mode reader/writer support batch #7087

Merged
merged 6 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/en/connector-v2/sink/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Used to write data to Redis.
| port | int | yes | - |
| key | string | yes | - |
| data_type | string | yes | - |
| batch_size | int | no | 10 |
| user | string | no | - |
| auth | string | no | - |
| db_num | int | no | 0 |
Expand Down Expand Up @@ -83,8 +84,12 @@ Redis data types, support `key` `hash` `list` `set` `zset`
- zset

> Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.

### user [string]
>
> ### batch_size [int]
>
> Ensure the batch write size in single-machine mode; no guarantees in cluster mode.
>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
>
> ### batch_size [int]
>
> Ensure the batch write size in single-machine mode; no guarantees in cluster mode.
>
### batch_size [int]
Ensure the batch write size in single-machine mode; no guarantees in cluster mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I was careless here

### user [string]

redis authentication user, you need it when you connect to an encrypted cluster

Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Used to read data from Redis.
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
| batch_size | int | yes | 10 |
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
Expand Down Expand Up @@ -113,6 +114,10 @@ each kv that in hash key it will be treated as a row and send it to upstream.

keys pattern

### batch_size [int]

indicates the number of keys to attempt to return per iteration.default 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
indicates the number of keys to attempt to return per iteration.default 10
indicates the number of keys to attempt to return per iteration, default 10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


**Tips:Redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type**

### data_type [string]
Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.client;

import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

import java.util.List;
import java.util.Map;
import java.util.Set;

public abstract class RedisClient extends Jedis {

protected final RedisParameters redisParameters;

protected final int batchSize;

protected final Jedis jedis;

protected RedisClient(RedisParameters redisParameters, Jedis jedis) {
this.redisParameters = redisParameters;
this.batchSize = redisParameters.getBatchSize();
this.jedis = jedis;
}

public ScanResult<String> scanKeys(
String cursor, int batchSize, String keysPattern, RedisDataType type) {
ScanParams scanParams = new ScanParams();
scanParams.match(keysPattern);
scanParams.count(batchSize);
return jedis.scan(cursor, scanParams, type.name());
}

public abstract List<String> batchGetString(List<String> keys);

public abstract List<List<String>> batchGetList(List<String> keys);

public abstract List<Set<String>> batchGetSet(List<String> keys);

public abstract List<Map<String, String>> batchGetHash(List<String> keys);

public abstract List<List<String>> batchGetZset(List<String> keys);

public abstract void batchWriteString(
List<String> keys, List<String> values, long expireSeconds);

public abstract void batchWriteList(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);

public abstract void batchWriteSet(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);

public abstract void batchWriteHash(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);

public abstract void batchWriteZset(
List<String> keyBuffer, List<String> valueBuffer, long expireSeconds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.client;

import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;

import org.apache.commons.collections4.CollectionUtils;

import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class RedisClusterClient extends RedisClient {
public RedisClusterClient(RedisParameters redisParameters, Jedis jedis) {
super(redisParameters, jedis);
}

@Override
public List<String> batchGetString(List<String> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<String> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedis.get(key));
}
return result;
}

@Override
public List<List<String>> batchGetList(List<String> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<List<String>> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedis.lrange(key, 0, -1));
}
return result;
}

@Override
public List<Set<String>> batchGetSet(List<String> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<Set<String>> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedis.smembers(key));
}
return result;
}

@Override
public List<Map<String, String>> batchGetHash(List<String> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<Map<String, String>> result = new ArrayList<>(keys.size());
for (String key : keys) {
Map<String, String> map = jedis.hgetAll(key);
map.put("hash_key", key);
result.add(map);
}
return result;
}

@Override
public List<List<String>> batchGetZset(List<String> keys) {
if (CollectionUtils.isEmpty(keys)) {
return new ArrayList<>();
}
List<List<String>> result = new ArrayList<>(keys.size());
for (String key : keys) {
result.add(jedis.zrange(key, 0, -1));
}
return result;
}

@Override
public void batchWriteString(List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.STRING.set(this, keys.get(i), values.get(i), expireSeconds);
}
}

@Override
public void batchWriteList(List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.LIST.set(this, keys.get(i), values.get(i), expireSeconds);
}
}

@Override
public void batchWriteSet(List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.SET.set(this, keys.get(i), values.get(i), expireSeconds);
}
}

@Override
public void batchWriteHash(List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.HASH.set(this, keys.get(i), values.get(i), expireSeconds);
}
}

@Override
public void batchWriteZset(List<String> keys, List<String> values, long expireSeconds) {
int size = keys.size();
for (int i = 0; i < size; i++) {
RedisDataType.ZSET.set(this, keys.get(i), values.get(i), expireSeconds);
}
}
}
Loading
Loading