Skip to content

Commit

Permalink
fix: m/s replication when using ipv6 (OT-CONTAINER-KIT#827)
Browse files Browse the repository at this point in the history
* Delete unnecessary brackets wrapping the IPv6 address

Signed-off-by: Jami Karvanen <[email protected]>

* Extract redis address formatting into separate getRedisServerAddress function

Signed-off-by: Jami Karvanen <[email protected]>

* Add tests for getRedisServerAddress

Signed-off-by: Jami Karvanen <[email protected]>

* Use getRedisServerAddress also in cluster-scaling.go

Signed-off-by: Jami Karvanen <[email protected]>

---------

Signed-off-by: Jami Karvanen <[email protected]>
  • Loading branch information
jmtsi authored Mar 15, 2024
1 parent 96c1b15 commit 1cf27bb
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 17 deletions.
16 changes: 8 additions & 8 deletions k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ReshardRedisCluster(client kubernetes.Interface, logger logr.Logger, cr *re
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(transferPOD, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, transferPOD)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, transferPOD, *cr.Spec.Port))
}

if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
Expand Down Expand Up @@ -157,7 +157,7 @@ func RebalanceRedisClusterEmptyMasters(client kubernetes.Interface, logger logr.
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, pod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, pod, *cr.Spec.Port))
}

cmd = append(cmd, "--cluster-use-empty-masters")
Expand Down Expand Up @@ -209,7 +209,7 @@ func RebalanceRedisCluster(client kubernetes.Interface, logger logr.Logger, cr *
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, pod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, pod, *cr.Spec.Port))
}

if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
Expand Down Expand Up @@ -247,8 +247,8 @@ func AddRedisNodeToCluster(ctx context.Context, client kubernetes.Interface, log
cmd = append(cmd, getRedisHostname(newPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, newPod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerIP(client, logger, existingPod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, newPod, *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, existingPod, *cr.Spec.Port))
}

if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
Expand Down Expand Up @@ -326,7 +326,7 @@ func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, existingPod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, existingPod, *cr.Spec.Port))
}

for _, followerNodeID := range followerNodeIDs {
Expand Down Expand Up @@ -356,7 +356,7 @@ func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(existingPod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, existingPod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, existingPod, *cr.Spec.Port))
}

removePodNodeID := getRedisNodeID(ctx, client, logger, cr, removePod)
Expand Down Expand Up @@ -417,7 +417,7 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, logger lo
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, pod)+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisServerAddress(client, logger, pod, *cr.Spec.Port))
}

if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
Expand Down
27 changes: 19 additions & 8 deletions k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,24 @@ func getRedisServerIP(client kubernetes.Interface, logger logr.Logger, redisInfo
// If we're NOT IPv4, assume we're IPv6..
if net.ParseIP(redisIP).To4() == nil {
logger.V(1).Info("Redis is using IPv6", "ip", redisIP)
redisIP = fmt.Sprintf("[%s]", redisIP)
}

logger.V(1).Info("Successfully got the IP for Redis", "ip", redisIP)
return redisIP
}

func getRedisServerAddress(client kubernetes.Interface, logger logr.Logger, rd RedisDetails, port int) string {
ip := getRedisServerIP(client, logger, rd)
format := "%s:%d"

// if ip is IPv6, wrap it in brackets
if net.ParseIP(ip).To4() == nil {
format = "[%s]:%d"
}

return fmt.Sprintf(format, ip, port)
}

// getRedisHostname will return the complete FQDN for redis
func getRedisHostname(redisInfo RedisDetails, cr *redisv1beta2.RedisCluster, role string) string {
fqdn := fmt.Sprintf("%s.%s-%s-headless.%s.svc", redisInfo.PodName, cr.ObjectMeta.Name, role, cr.Namespace)
Expand Down Expand Up @@ -85,7 +96,7 @@ func CreateMultipleLeaderRedisCommand(client kubernetes.Interface, logger logr.L
if cr.Spec.ClusterVersion != nil && *cr.Spec.ClusterVersion == "v7" {
address = getRedisHostname(RedisDetails{PodName: podName, Namespace: cr.Namespace}, cr, "leader") + fmt.Sprintf(":%d", *cr.Spec.Port)
} else {
address = getRedisServerIP(client, logger, RedisDetails{PodName: podName, Namespace: cr.Namespace}) + fmt.Sprintf(":%d", *cr.Spec.Port)
address = getRedisServerAddress(client, logger, RedisDetails{PodName: podName, Namespace: cr.Namespace}, *cr.Spec.Port)
}
cmd = append(cmd, address)
}
Expand Down Expand Up @@ -144,8 +155,8 @@ func createRedisReplicationCommand(client kubernetes.Interface, logger logr.Logg
followerAddress = getRedisHostname(followerPod, cr, "follower") + fmt.Sprintf(":%d", *cr.Spec.Port)
leaderAddress = getRedisHostname(leaderPod, cr, "leader") + fmt.Sprintf(":%d", *cr.Spec.Port)
} else {
followerAddress = getRedisServerIP(client, logger, followerPod) + fmt.Sprintf(":%d", *cr.Spec.Port)
leaderAddress = getRedisServerIP(client, logger, leaderPod) + fmt.Sprintf(":%d", *cr.Spec.Port)
followerAddress = getRedisServerAddress(client, logger, followerPod, *cr.Spec.Port)
leaderAddress = getRedisServerAddress(client, logger, leaderPod, *cr.Spec.Port)
}

cmd = append(cmd, followerAddress, leaderAddress, "--cluster-slave")
Expand Down Expand Up @@ -340,14 +351,14 @@ func configureRedisClient(client kubernetes.Interface, logger logr.Logger, cr *r
logger.Error(err, "Error in getting redis password")
}
redisClient = redis.NewClient(&redis.Options{
Addr: getRedisServerIP(client, logger, redisInfo) + fmt.Sprintf(":%d", *cr.Spec.Port),
Addr: getRedisServerAddress(client, logger, redisInfo, *cr.Spec.Port),
Password: pass,
DB: 0,
TLSConfig: getRedisTLSConfig(client, logger, cr, redisInfo),
})
} else {
redisClient = redis.NewClient(&redis.Options{
Addr: getRedisServerIP(client, logger, redisInfo) + fmt.Sprintf(":%d", *cr.Spec.Port),
Addr: getRedisServerAddress(client, logger, redisInfo, *cr.Spec.Port),
Password: "",
DB: 0,
TLSConfig: getRedisTLSConfig(client, logger, cr, redisInfo),
Expand Down Expand Up @@ -459,14 +470,14 @@ func configureRedisReplicationClient(client kubernetes.Interface, logger logr.Lo
logger.Error(err, "Error in getting redis password")
}
redisClient = redis.NewClient(&redis.Options{
Addr: getRedisServerIP(client, logger, redisInfo) + ":6379",
Addr: getRedisServerAddress(client, logger, redisInfo, 6379),
Password: pass,
DB: 0,
TLSConfig: getRedisReplicationTLSConfig(client, logger, cr, redisInfo),
})
} else {
redisClient = redis.NewClient(&redis.Options{
Addr: getRedisServerIP(client, logger, redisInfo) + ":6379",
Addr: getRedisServerAddress(client, logger, redisInfo, 6379),
Password: "",
DB: 0,
TLSConfig: getRedisReplicationTLSConfig(client, logger, cr, redisInfo),
Expand Down
67 changes: 66 additions & 1 deletion k8sutils/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestGetRedisServerIP(t *testing.T) {
PodName: "redis-pod",
Namespace: "default",
},
expectedIP: "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]",
expectedIP: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
expectEmpty: false,
},
{
Expand Down Expand Up @@ -141,6 +141,71 @@ func TestGetRedisServerIP(t *testing.T) {
}
}

func TestGetRedisServerAddress(t *testing.T) {
tests := []struct {
name string
setup func() *k8sClientFake.Clientset
redisInfo RedisDetails
expectedAddr string
expectEmpty bool
}{
{
name: "Successfully retrieve IPv4 URI",
setup: func() *k8sClientFake.Clientset {
return k8sClientFake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "redis-pod",
Namespace: "default",
},
Status: corev1.PodStatus{
PodIP: "192.168.1.1",
},
})
},
redisInfo: RedisDetails{
PodName: "redis-pod",
Namespace: "default",
},
expectedAddr: "192.168.1.1:6379",
expectEmpty: false,
},
{
name: "Successfully retrieve IPv6 URI",
setup: func() *k8sClientFake.Clientset {
return k8sClientFake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "redis-pod",
Namespace: "default",
},
Status: corev1.PodStatus{
PodIP: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
},
})
},
redisInfo: RedisDetails{
PodName: "redis-pod",
Namespace: "default",
},
expectedAddr: "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:6379",
expectEmpty: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client := tt.setup()
logger := testr.New(t)
redisIP := getRedisServerAddress(client, logger, tt.redisInfo, 6379)

if tt.expectEmpty {
assert.Empty(t, redisIP, "Expected an empty address")
} else {
assert.Equal(t, tt.expectedAddr, redisIP, "Expected and actual address do not match")
}
})
}
}

func TestGetRedisHostname(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 1cf27bb

Please sign in to comment.