diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 5324bd7b345..bcbb7c4c375 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -73,38 +73,39 @@ func parseRedisMetadata(metadata, resolvedEnv, authParams map[string]string) (*r return nil, fmt.Errorf("no list name given") } - address := defaultRedisAddress - host := defaultHost - port := defaultPort - if val, ok := metadata["address"]; ok && val != "" { - address = val - } else { - if val, ok := metadata["host"]; ok && val != "" { - host = val - } else { - return nil, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") + if val, ok := authParams["address"]; ok { + meta.address = val + } else if addressEnvName, ok := metadata["address"]; ok && addressEnvName != "" { + if val, ok := resolvedEnv[addressEnvName]; ok { + meta.address = val } - if val, ok := metadata["port"]; ok && val != "" { - port = val - } else { - return nil, fmt.Errorf("no address or port given. address should be in the format of host:port or you should set the host/port values") + if meta.address == "" { + return nil, fmt.Errorf("no address given") } - } - - if val, ok := resolvedEnv[address]; ok { - meta.address = val } else { - if val, ok := resolvedEnv[host]; ok { + if val, ok := authParams["host"]; ok { meta.host = val + } else if hostEnvName, ok := metadata["host"]; ok && hostEnvName != "" { + if val, ok := resolvedEnv[hostEnvName]; ok { + meta.host = val + } else { + return nil, fmt.Errorf("no host given. Address should be in the format of host:port or you should provide both host and port") + } } else { - return nil, fmt.Errorf("no address given or host given. Address should be in the format of host:port or you should provide both host and port") + return nil, fmt.Errorf("no host given. address should be in the format of host:port or you should set the host/port values") } - - if val, ok := resolvedEnv[port]; ok { + if val, ok := authParams["port"]; ok { meta.port = val + } else if portEnvName, ok := metadata["port"]; ok && portEnvName != "" { + if val, ok := resolvedEnv[portEnvName]; ok { + meta.port = val + } else { + return nil, fmt.Errorf("no port given. Address should be in the format of host:port or you should provide both host and port") + } } else { - return nil, fmt.Errorf("no address or port given. Address should be in the format of host:port or you should provide both host and port") + return nil, fmt.Errorf("no port given. address should be in the format of host:port or you should set the host/port values") } + meta.address = fmt.Sprintf("%s:%s", meta.host, meta.port) } diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index a19e9686091..610e7256180 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "fmt" "testing" ) @@ -35,16 +36,23 @@ var testRedisMetadata = []parseRedisMetadataTestData{ {map[string]string{"listName": "mylist", "listLength": "0", "address": "REDIS_WRONG", "password": ""}, true, map[string]string{}}, // password is defined in the authParams {map[string]string{"listName": "mylist", "listLength": "0", "address": "REDIS_WRONG"}, true, map[string]string{"password": ""}}, -} + // address is defined in the authParams + {map[string]string{"listName": "mylist", "listLength": "0"}, false, map[string]string{"address": "localhost:6379"}}, + // host and port is defined in the authParams + {map[string]string{"listName": "mylist", "listLength": "0"}, false, map[string]string{"host": "localhost", "port": "6379"}}, + // host only is defined in the authParams + {map[string]string{"listName": "mylist", "listLength": "0"}, true, map[string]string{"host": "localhost"}}} func TestRedisParseMetadata(t *testing.T) { + testCaseNum := 1 for _, testData := range testRedisMetadata { _, err := parseRedisMetadata(testData.metadata, testRedisResolvedEnv, testData.authParams) if err != nil && !testData.isError { - t.Error("Expected success but got error", err) + t.Error(fmt.Sprintf("Expected success but got error for unit test # %v", testCaseNum), err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Error(fmt.Sprintf("Expected error but got success for unit test #%v", testCaseNum)) } + testCaseNum++ } } diff --git a/tests/scalers/redis-lists.test.ts b/tests/scalers/redis-lists.test.ts index 25bfb5ef462..05537c47a85 100644 --- a/tests/scalers/redis-lists.test.ts +++ b/tests/scalers/redis-lists.test.ts @@ -12,12 +12,15 @@ const redisPort = 6379 const redisAddress = `${redisHost}:${redisPort}` const listNameForHostPortRef = 'my-test-list-host-port-ref' const listNameForAddressRef = 'my-test-list-address-ref' +const listNameForHostPortTriggerAuth = 'my-test-list-host-port-trigger' const redisWorkerHostPortRefDeploymentName = 'redis-worker-test-hostport' const redisWorkerAddressRefDeploymentName = 'redis-worker-test-address' +const redisWorkerHostPortRefTriggerAuthDeploymentName = 'redis-worker-test-hostport-triggerauth' const itemsToWrite = 200 const deploymentContainerImage = 'kedacore/tests-redis-lists:824031e' const writeJobNameForHostPortRef = 'redis-writer-host-port-ref' const writeJobNameForAddressRef = 'redis-writer-address-ref' +const writeJobNameForHostPortInTriggerAuth = 'redis-writer-host-port-trigger-auth' test.before(t => { // setup Redis @@ -50,6 +53,20 @@ test.before(t => { 'creating trigger auth should work..' ) + const triggerAuthHostPortTmpFile = tmp.fileSync() + + fs.writeFileSync(triggerAuthHostPortTmpFile.name, + scaledObjectTriggerAuthHostPortYaml.replace('{{REDIS_PASSWORD}}', base64Password) + .replace('{{REDIS_HOST}}', Buffer.from(redisHost).toString('base64')) + .replace('{{REDIS_PORT}}', Buffer.from(redisPort.toString()).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthHostPortTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth with host port should work..' + ) + const deploymentHostPortRefTmpFile = tmp.fileSync() fs.writeFileSync(deploymentHostPortRefTmpFile.name, redisRedisListDeployHostPortYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) @@ -68,7 +85,7 @@ test.before(t => { const deploymentAddressRefTmpFile = tmp.fileSync() - fs.writeFileSync(deploymentAddressRefTmpFile.name, redisRedisListDeployAddressYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + fs.writeFileSync(deploymentAddressRefTmpFile.name, redisListDeployAddressYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) .replace(/{{REDIS_ADDRESS}}/g, redisAddress) .replace(/{{LIST_NAME}}/g, listNameForAddressRef) .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerAddressRefDeploymentName) @@ -80,6 +97,23 @@ test.before(t => { sh.exec(`kubectl apply -f ${deploymentAddressRefTmpFile.name} --namespace ${testNamespace}`).code, 'creating a deployment using redis address var should work..' ) + + + const deploymentHostPortRefTriggerAuthTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTriggerAuthTmpFile.name, redisListDeployHostPortInTriggerAuhYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_HOST}}/g, redisHost) + .replace(/{{REDIS_PORT}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortTriggerAuth) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefTriggerAuthDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTriggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host port in trigger auth should work..' + ) }) test.serial('Deployment for redis host and port env vars should have 0 replica on start', t => { @@ -92,29 +126,8 @@ test.serial('Deployment for redis host and port env vars should have 0 replica o test.serial(`Deployment using redis host port env vars should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { - // write to list - const tmpFile = tmp.fileSync() - fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESS}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) - .replace('{{LIST_NAME}}', listNameForHostPortRef) - .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}',itemsToWrite.toString()) - .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) - .replace('{{JOB_NAME}}', writeJobNameForHostPortRef) - ) - - t.is( - 0, - sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, - 'list writer job should apply.' - ) - - // wait for the write job to complete - for (let i = 0; i < 20; i++) { - const succeeded = sh.exec(`kubectl get job ${writeJobNameForHostPortRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout - if (succeeded == '1') { - break - } - sh.exec('sleep 1s') - } + + runWriteJob(t, writeJobNameForHostPortRef, listNameForHostPortRef) let replicaCount = '0' for (let i = 0; i < 20 && replicaCount !== '5'; i++) { @@ -153,34 +166,53 @@ test.serial('Deployment for redis address env var should have 0 replica on start test.serial(`Deployment using redis address env var should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { - // write to list - const tmpFile = tmp.fileSync() - fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESS}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) - .replace('{{LIST_NAME}}', listNameForAddressRef) - .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}',itemsToWrite.toString()) - .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) - .replace('{{JOB_NAME}}', writeJobNameForAddressRef) - ) + + runWriteJob(t, writeJobNameForAddressRef, listNameForAddressRef) - t.is( - 0, - sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, - 'list writer job should apply.' - ) + let replicaCount = '0' + for (let i = 0; i < 20 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } - // wait for the write job to complete - for (let i = 0; i < 20; i++) { - const succeeded = sh.exec(`kubectl get job ${writeJobNameForAddressRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout - if (succeeded == '1') { - break + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') } - sh.exec('sleep 1s') } + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.serial('Deployment for redis host and port in the trigger auth should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port in triggerAuth should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForHostPortInTriggerAuth, listNameForHostPortTriggerAuth) + let replicaCount = '0' for (let i = 0; i < 20 && replicaCount !== '5'; i++) { replicaCount = sh.exec( - `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` ).stdout t.log('(scale up) replica count is:' + replicaCount) if (replicaCount !== '5') { @@ -192,7 +224,7 @@ test.serial(`Deployment using redis address env var should max and scale to 5 wi for (let i = 0; i < 12 && replicaCount !== '0'; i++) { replicaCount = sh.exec( - `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` ).stdout t.log('(scale down) replica count is:' + replicaCount) if (replicaCount !== '0') { @@ -206,14 +238,18 @@ test.serial(`Deployment using redis address env var should max and scale to 5 wi test.after.always.cb('clean up deployment', t => { const resources = [ - 'secret/redis-password', `job/${writeJobNameForHostPortRef}`, `job/${writeJobNameForAddressRef}`, - `deployment/${redisWorkerHostPortRefDeploymentName}`, + `job/${writeJobNameForHostPortInTriggerAuth}`, `scaledobject.keda.k8s.io/${redisWorkerHostPortRefDeploymentName}`, - `deployment/${redisRedisListDeployHostPortYaml}`, - `scaledobject.keda.k8s.io/${redisRedisListDeployHostPortYaml}`, - 'triggerauthentications.keda.k8s.io/keda-redis-list-triggerauth' + `scaledobject.keda.k8s.io/${redisWorkerAddressRefDeploymentName}`, + `scaledobject.keda.k8s.io/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + 'triggerauthentications.keda.k8s.io/keda-redis-list-triggerauth', + 'triggerauthentications.keda.k8s.io/keda-redis-list-triggerauth-host-port', + `deployment/${redisWorkerAddressRefDeploymentName}`, + `deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + `deployment/${redisWorkerHostPortRefDeploymentName}`, + 'secret/redis-password', ] for (const resource of resources) { @@ -225,6 +261,32 @@ test.after.always.cb('clean up deployment', t => { t.end() }) +function runWriteJob(t, jobName, listName) { + // write to list + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESS}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) + .replace('{{LIST_NAME}}', listName) + .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}', itemsToWrite.toString()) + .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) + .replace('{{JOB_NAME}}', jobName) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'list writer job should apply.' + ) + + // wait for the write job to complete + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job ${writeJobNameForHostPortRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } +} + const redisDeployYaml = `apiVersion: apps/v1 kind: Deployment metadata: @@ -320,7 +382,7 @@ spec: ` -const redisRedisListDeployAddressYaml = `apiVersion: apps/v1 +const redisListDeployAddressYaml = `apiVersion: apps/v1 kind: Deployment metadata: name: {{DEPLOYMENT_NAME}} @@ -374,6 +436,87 @@ spec: name: keda-redis-list-triggerauth ` +const redisListDeployHostPortInTriggerAuhYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + args: ["read"] + env: + - name: REDIS_HOST + value: {{REDIS_HOST}} + - name: REDIS_PORT + value: "{{REDIS_PORT}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: READ_PROCESS_TIME + value: "200" +--- +apiVersion: keda.k8s.io/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + deploymentName: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + deploymentName: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis + metadata: + listName: {{LIST_NAME}} + listLength: "5" + authenticationRef: + name: keda-redis-list-triggerauth-host-port +` + +const scaledObjectTriggerAuthHostPortYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-config +type: Opaque +data: + password: {{REDIS_PASSWORD}} + redisHost: {{REDIS_HOST}} + redisPort: {{REDIS_PORT}} +--- +apiVersion: keda.k8s.io/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-list-triggerauth-host-port +spec: + secretTargetRef: + - parameter: password + name: redis-config + key: password + - parameter: host + name: redis-config + key: redisHost + - parameter: port + name: redis-config + key: redisPort +` const scaledObjectTriggerAuthYaml = `apiVersion: v1 kind: Secret