Skip to content

Commit 32f8087

Browse files
committed
Added Redis Cluster support
1 parent af2278b commit 32f8087

11 files changed

+320
-44
lines changed

head-node/src/data/mutex.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,28 @@ import (
77
"github.com/go-redis/redis/v7"
88
)
99

10+
type MutexClient interface {
11+
Get(key string) *redis.StringCmd
12+
Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd
13+
Del(keys ...string) *redis.IntCmd
14+
}
15+
1016
type Mutex interface {
1117
Lock(string)
1218
UnLock(string)
1319
Wait(string)
1420
}
1521

1622
type mutex struct {
17-
client *redis.Client
23+
client MutexClient
1824
keyPrefix string
1925
}
2026

21-
func NewMutex(address string) (Mutex, error) {
22-
client := redis.NewClient(&redis.Options{
23-
Addr: address,
24-
})
25-
26-
_, err := client.Ping().Result()
27-
if err != nil {
28-
return nil, err
29-
}
30-
27+
func NewMutex(client MutexClient) Mutex {
3128
return &mutex{
3229
client: client,
3330
keyPrefix: "lock",
34-
}, nil
31+
}
3532
}
3633

3734
func (m *mutex) key(name string) string {

head-node/src/data/mutex_cluster.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package data
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-redis/redis/v7"
7+
)
8+
9+
type mutexCluster struct {
10+
cluster *redis.ClusterClient
11+
}
12+
13+
func NewMutexClusterClient(addresses []string) (MutexClient, error) {
14+
cluster := redis.NewClusterClient(&redis.ClusterOptions{
15+
Addrs: addresses,
16+
})
17+
18+
_, err := cluster.Ping().Result()
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return &mutexCluster{
24+
cluster: cluster,
25+
}, nil
26+
}
27+
28+
func (m mutexCluster) Get(key string) *redis.StringCmd {
29+
return m.cluster.Get(key)
30+
}
31+
32+
func (m mutexCluster) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
33+
return m.cluster.Set(key, value, expiration)
34+
}
35+
36+
func (m mutexCluster) Del(keys ...string) *redis.IntCmd {
37+
return m.cluster.Del(keys...)
38+
}
39+
40+
var _ MutexClient = &mutexCluster{}
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package data
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-redis/redis/v7"
7+
)
8+
9+
type mutexStandalone struct {
10+
client *redis.Client
11+
}
12+
13+
func NewMutexStandaloneClient(address string) (MutexClient, error) {
14+
client := redis.NewClient(&redis.Options{
15+
Addr: address,
16+
})
17+
18+
_, err := client.Ping().Result()
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return &mutexStandalone{
24+
client: client,
25+
}, nil
26+
}
27+
28+
func (m mutexStandalone) Get(key string) *redis.StringCmd {
29+
return m.client.Get(key)
30+
}
31+
32+
func (m mutexStandalone) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
33+
return m.client.Set(key, value, expiration)
34+
}
35+
36+
func (m mutexStandalone) Del(keys ...string) *redis.IntCmd {
37+
return m.client.Del(keys...)
38+
}
39+
40+
var _ MutexClient = &mutexStandalone{}

head-node/src/main.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,21 @@ func main() {
5757
}
5858
fmt.Printf("INFO: REDIS_CONN: %s\n", redisConn)
5959

60-
mutex, err := data.NewMutex(redisConn)
60+
redisClusterMode := os.Getenv("REDIS_CLUSTER_MODE")
61+
fmt.Printf("INFO: REDIS_CLUSTER_MODE: %t\n", len(redisClusterMode) > 0)
62+
63+
var mutexClient data.MutexClient
64+
var err error
65+
if len(redisClusterMode) == 0 {
66+
mutexClient, err = data.NewMutexStandaloneClient(redisConn)
67+
} else {
68+
mutexClient, err = data.NewMutexClusterClient(strings.Split(redisConn, ","))
69+
}
6170
if err != nil {
6271
fmt.Printf("ERROR: Mutex Setup is failed. %s\n", err.Error())
6372
os.Exit(13)
6473
}
74+
mutex := data.NewMutex(mutexClient)
6575

6676
conn, err := data.NewConnection(mongoConn)
6777
if err != nil {

manager-node/src/data/index.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ import (
77
"github.com/go-redis/redis/v7"
88
)
99

10+
type IndexClient interface {
11+
Del(keys ...string) *redis.IntCmd
12+
HSet(key, field string, value interface{}) *redis.BoolCmd
13+
HGet(key, field string) *redis.StringCmd
14+
HDel(key string, fields ...string) *redis.IntCmd
15+
HGetAll(key string) *redis.StringStringMapCmd
16+
HMSet(key string, values ...interface{}) *redis.IntCmd
17+
}
18+
1019
type Index interface {
1120
Add(clusterId string, sha512Hex string) error
1221
Find(clusterIds []string, sha512Hex string) (string, error)
@@ -17,25 +26,16 @@ type Index interface {
1726

1827
type index struct {
1928
mutex Mutex
20-
client *redis.Client
29+
client IndexClient
2130
keyPrefix string
2231
}
2332

24-
func NewIndex(address string, keyPrefix string, mutex Mutex) (Index, error) {
25-
client := redis.NewClient(&redis.Options{
26-
Addr: address,
27-
})
28-
29-
_, err := client.Ping().Result()
30-
if err != nil {
31-
return nil, err
32-
}
33-
33+
func NewIndex(client IndexClient, keyPrefix string, mutex Mutex) Index {
3434
return &index{
35-
mutex: mutex,
3635
client: client,
3736
keyPrefix: keyPrefix,
38-
}, nil
37+
mutex: mutex,
38+
}
3939
}
4040

4141
func (i *index) key(name string) string {
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package data
2+
3+
import "github.com/go-redis/redis/v7"
4+
5+
type indexCluster struct {
6+
cluster *redis.ClusterClient
7+
}
8+
9+
func NewIndexClusterClient(addresses []string) (IndexClient, error) {
10+
cluster := redis.NewClusterClient(&redis.ClusterOptions{
11+
Addrs: addresses,
12+
})
13+
14+
_, err := cluster.Ping().Result()
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
return &indexCluster{
20+
cluster: cluster,
21+
}, nil
22+
}
23+
24+
func (r indexCluster) Del(keys ...string) *redis.IntCmd {
25+
return r.cluster.Del(keys...)
26+
}
27+
28+
func (r indexCluster) HSet(key, field string, value interface{}) *redis.BoolCmd {
29+
return r.cluster.HSet(key, field, value)
30+
}
31+
32+
func (r indexCluster) HGet(key, field string) *redis.StringCmd {
33+
return r.cluster.HGet(key, field)
34+
}
35+
36+
func (r indexCluster) HDel(key string, fields ...string) *redis.IntCmd {
37+
return r.cluster.HDel(key, fields...)
38+
}
39+
40+
func (r indexCluster) HGetAll(key string) *redis.StringStringMapCmd {
41+
return r.cluster.HGetAll(key)
42+
}
43+
44+
func (r indexCluster) HMSet(key string, values ...interface{}) *redis.IntCmd {
45+
return r.cluster.HMSet(key, values...)
46+
}
47+
48+
var _ IndexClient = &indexCluster{}
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package data
2+
3+
import "github.com/go-redis/redis/v7"
4+
5+
type indexStandalone struct {
6+
client *redis.Client
7+
}
8+
9+
func NewIndexStandaloneClient(address string) (IndexClient, error) {
10+
client := redis.NewClient(&redis.Options{
11+
Addr: address,
12+
})
13+
14+
_, err := client.Ping().Result()
15+
if err != nil {
16+
return nil, err
17+
}
18+
19+
return &indexStandalone{
20+
client: client,
21+
}, nil
22+
}
23+
24+
func (r indexStandalone) Del(keys ...string) *redis.IntCmd {
25+
return r.client.Del(keys...)
26+
}
27+
28+
func (r indexStandalone) HSet(key, field string, value interface{}) *redis.BoolCmd {
29+
return r.client.HSet(key, field, value)
30+
}
31+
32+
func (r indexStandalone) HGet(key, field string) *redis.StringCmd {
33+
return r.client.HGet(key, field)
34+
}
35+
36+
func (r indexStandalone) HDel(key string, fields ...string) *redis.IntCmd {
37+
return r.client.HDel(key, fields...)
38+
}
39+
40+
func (r indexStandalone) HGetAll(key string) *redis.StringStringMapCmd {
41+
return r.client.HGetAll(key)
42+
}
43+
44+
func (r indexStandalone) HMSet(key string, values ...interface{}) *redis.IntCmd {
45+
return r.client.HMSet(key, values...)
46+
}
47+
48+
var _ IndexClient = &indexStandalone{}

manager-node/src/data/mutex.go

+9-12
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,28 @@ import (
77
"github.com/go-redis/redis/v7"
88
)
99

10+
type MutexClient interface {
11+
Get(key string) *redis.StringCmd
12+
Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd
13+
Del(keys ...string) *redis.IntCmd
14+
}
15+
1016
type Mutex interface {
1117
Lock(string)
1218
UnLock(string)
1319
Wait(string)
1420
}
1521

1622
type mutex struct {
17-
client *redis.Client
23+
client MutexClient
1824
keyPrefix string
1925
}
2026

21-
func NewMutex(address string) (Mutex, error) {
22-
client := redis.NewClient(&redis.Options{
23-
Addr: address,
24-
})
25-
26-
_, err := client.Ping().Result()
27-
if err != nil {
28-
return nil, err
29-
}
30-
27+
func NewMutex(client MutexClient) Mutex {
3128
return &mutex{
3229
client: client,
3330
keyPrefix: "lock",
34-
}, nil
31+
}
3532
}
3633

3734
func (m *mutex) key(name string) string {
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package data
2+
3+
import (
4+
"time"
5+
6+
"github.com/go-redis/redis/v7"
7+
)
8+
9+
type mutexCluster struct {
10+
cluster *redis.ClusterClient
11+
}
12+
13+
func NewMutexClusterClient(addresses []string) (MutexClient, error) {
14+
cluster := redis.NewClusterClient(&redis.ClusterOptions{
15+
Addrs: addresses,
16+
})
17+
18+
_, err := cluster.Ping().Result()
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
return &mutexCluster{
24+
cluster: cluster,
25+
}, nil
26+
}
27+
28+
func (m mutexCluster) Get(key string) *redis.StringCmd {
29+
return m.cluster.Get(key)
30+
}
31+
32+
func (m mutexCluster) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
33+
return m.cluster.Set(key, value, expiration)
34+
}
35+
36+
func (m mutexCluster) Del(keys ...string) *redis.IntCmd {
37+
return m.cluster.Del(keys...)
38+
}
39+
40+
var _ MutexClient = &mutexCluster{}

0 commit comments

Comments
 (0)