Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Redis support #2181

Merged
merged 8 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))

### Improvements

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/denisenkom/go-mssqldb v0.11.0
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.4
github.com/go-sql-driver/mysql v1.6.0
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1
github.com/golang/mock v1.6.0
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -245,6 +246,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzkU0rAM92tn3hb3Anb7oz7KcnixF49+2wOMe4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U=
Expand Down Expand Up @@ -353,8 +356,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down
178 changes: 164 additions & 14 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strconv"
"strings"

"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,11 +33,15 @@ type redisScaler struct {
}

type redisConnectionInfo struct {
addresses []string
password string
hosts []string
ports []string
enableTLS bool
addresses []string
username string
password string
sentinelUsername string
sentinelPassword string
sentinelMaster string
hosts []string
ports []string
enableTLS bool
}

type redisMetadata struct {
Expand All @@ -51,7 +55,7 @@ type redisMetadata struct {
var redisLog = logf.Log.WithName("redis_scaler")

// NewRedisScaler creates a new redisScaler
func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig) (Scaler, error) {
func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
Expand All @@ -71,7 +75,14 @@ func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig)
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createClusteredRedisScaler(ctx, meta, luaScript)
} else if isSentinel {
meta, err := parseRedisMetadata(config, parseRedisSentinelAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createSentinelRedisScaler(ctx, meta, luaScript)
}

meta, err := parseRedisMetadata(config, parseRedisAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
Expand All @@ -94,8 +105,37 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script
}

listLengthFn := func(ctx context.Context) (int64, error) {
cl := client.WithContext(ctx)
cmd := cl.Eval(script, []string{meta.listName})
cmd := client.Eval(ctx, script, []string{meta.listName})
jerbob92 marked this conversation as resolved.
Show resolved Hide resolved
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
}, nil
}

func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script string) (Scaler, error) {
client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis sentinel failed: %s", err)
}

closeFn := func() error {
if err := client.Close(); err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
return nil
}

listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
jerbob92 marked this conversation as resolved.
Show resolved Hide resolved
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -125,8 +165,7 @@ func createRedisScaler(ctx context.Context, meta *redisMetadata, script string)
}

listLengthFn := func(ctx context.Context) (int64, error) {
cl := client.WithContext(ctx)
cmd := cl.Eval(script, []string{meta.listName})
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -267,6 +306,15 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values")
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
Expand All @@ -285,7 +333,7 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info := redisConnectionInfo{}
switch {
case authParams["addresses"] != "":
Expand Down Expand Up @@ -327,12 +375,87 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
}

return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return info, fmt.Errorf("enableTLS parsing error %s", err.Error())
}
info.enableTLS = tls
}

return info, nil
}

func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

switch {
case authParams["sentinelUsername"] != "":
info.sentinelUsername = authParams["sentinelUsername"]
case metadata["sentinelUsername"] != "":
info.sentinelUsername = metadata["sentinelUsername"]
case metadata["sentinelUsernameFromEnv"] != "":
info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]]
}

if authParams["sentinelPassword"] != "" {
info.sentinelPassword = authParams["sentinelPassword"]
} else if metadata["sentinelPasswordFromEnv"] != "" {
info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]]
}

switch {
case authParams["sentinelMaster"] != "":
info.sentinelMaster = authParams["sentinelMaster"]
case metadata["sentinelMaster"] != "":
info.sentinelMaster = metadata["sentinelMaster"]
case metadata["sentinelMasterFromEnv"] != "":
info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
Expand All @@ -348,6 +471,7 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redis.ClusterClient, error) {
options := &redis.ClusterOptions{
Addrs: info.addresses,
Username: info.username,
Password: info.password,
}
if info.enableTLS {
Expand All @@ -358,7 +482,31 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi

// confirm if connected
c := redis.NewClusterClient(options)
if err := c.WithContext(ctx).Ping().Err(); err != nil {
if err := c.Ping(ctx).Err(); err != nil {
return nil, err
}
return c, nil
}

func getRedisSentinelClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.FailoverOptions{
Username: info.username,
Password: info.password,
DB: dbIndex,
SentinelAddrs: info.addresses,
SentinelUsername: info.sentinelUsername,
SentinelPassword: info.sentinelPassword,
MasterName: info.sentinelMaster,
}
if info.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: info.enableTLS,
}
}

// confirm if connected
c := redis.NewFailoverClient(options)
if err := c.Ping(ctx).Err(); err != nil {
return nil, err
}
return c, nil
Expand All @@ -367,6 +515,7 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi
func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.Options{
Addr: info.addresses[0],
Username: info.username,
Password: info.password,
DB: dbIndex,
}
Expand All @@ -378,7 +527,8 @@ func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int)

// confirm if connected
c := redis.NewClient(options)
if err := c.WithContext(ctx).Ping().Err(); err != nil {
err := c.Ping(ctx).Err()
if err != nil {
return nil, err
}
return c, nil
Expand Down
Loading