diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 102190c8148..45368034f34 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -1,8 +1,8 @@ import * as fs from 'fs' import * as sh from 'shelljs' import * as tmp from 'tmp' -import test from 'ava'; -import { createNamespace } from './helpers'; +import test, { Assertions } from 'ava'; +import { createNamespace, waitForDeploymentReplicaCount } from './helpers'; const defaultNamespace = 'kafka-test' const defaultCluster = 'kafka-cluster' @@ -10,8 +10,7 @@ const timeToWait = 300 const defaultTopic = 'kafka-topic' const defaultTopic2 = 'kafka-topic-2' const defaultKafkaClient = 'kafka-client' -const strimziOperatorVersion = '0.18.0' -const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` +const strimziOperatorVersion = '0.23.0' const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() @@ -24,123 +23,61 @@ const scaledObjectEarliestYamlFile = tmp.fileSync() const scaledObjectLatestYamlFile = tmp.fileSync() const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() -test.before('Set up, create necessary resources.', t => { - sh.config.silent = true +function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) { + sh.exec(`echo Deploying ${name}`) + fs.writeFileSync(filename, yaml) + t.is(0, sh.exec(`kubectl apply -f ${filename} --namespace ${defaultNamespace}`).code, `Deploying ${name} should work.`) +} + +function waitForReady(t: Assertions, app: string, name: string, condition: string = 'Ready') { + sh.exec(`echo Waiting for ${app} for ${timeToWait} seconds to be ${condition}`) + t.is( + 0, + sh.exec(`kubectl wait ${app} --for=condition=${condition} --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, + `${name} should be ready within given time limit.` + ) +} + +test.before('Set up, create necessary resources.', async t => { createNamespace(defaultNamespace) + sh.config.silent = true const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) - t.is( - 0, - sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Strimzi operator should work.' - ) + sh.config.silent = false - fs.writeFileSync(kafkaClusterYamlFile.name, kafkaClusterYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClusterYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka cluster instance should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafka/${defaultCluster} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka instance should be ready within given time limit.' - ) + deployFromYaml(t, strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`), 'Strimzi operator') + deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster') + waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance') - fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaTopicYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka topic should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic should be ready withlanguage-mattersin given time limit.' - ) - t.is( - 0, - sh.exec(`kubectl wait kafkatopic/${defaultTopic2} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic2 should be ready within given time limit.' - ) + deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic') + waitForReady(t, `kafkatopic/${defaultTopic}`,'Kafka topic') + waitForReady(t, `kafkatopic/${defaultTopic2}`,'Kafka topic2') - fs.writeFileSync(kafkaClientYamlFile.name, kafkaClientYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaClientYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka client should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait pod/${defaultKafkaClient} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka client should be ready within given time limit.' - ) + deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client') + waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client') - fs.writeFileSync(kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml) + deployFromYaml(t, kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application') + deployFromYaml(t, scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object') + waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available') - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) - fs.writeFileSync(scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) - t.is( - 0, - sh.exec(`kubectl wait deployment/kafka-consumer --for=condition=Available --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka application should be ready within given time limit.' - ) - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0') }); -function waitForReplicaCount(desiredReplicaCount: number, commandToCheck: string) { - let replicaCount = undefined - let changed = undefined - for (let i = 0; i < 10; i++) { - changed = false - // checks the replica count 3 times, it tends to fluctuate from the beginning - for (let j = 0; j < 3; j++) { - replicaCount = sh.exec(commandToCheck).stdout - if (replicaCount === desiredReplicaCount.toString()) { - sh.exec('sleep 2s') - } else { - changed = true - break - } - } - if (changed === false) { - return - } else { - sh.exec('sleep 3s') - } - } -} - -test.serial('Scale application with kafka messages.', t => { +test.serial('Scale application with kafka messages.', async t => { for (let r = 1; r <= 3; r++) { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(r, commandToCheckReplicas) - - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) -test.serial('Scale application beyond partition max.', t => { +test.serial('Scale application beyond partition max.', async t => { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(3, commandToCheckReplicas) - - t.is('3', sh.exec(commandToCheckReplicas).stdout, `Replica count should be 3.`) + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) test.serial('cleanup after earliest policy test', t=> { @@ -158,40 +95,27 @@ test.serial('cleanup after earliest policy test', t=> { sh.exec(`sleep 30s`) }) -test.serial('Applying ScaledObject latest policy should not scale up pods', t => { +test.serial('Applying ScaledObject latest policy should not scale up pods', async t => { //Make the consumer commit the first offset for each partition. sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - fs.writeFileSync(kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) + deployFromYaml(t, kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application') sh.exec(`sleep 10s`) - fs.writeFileSync(scaledObjectLatestYamlFile.name, scaledObjectLatestYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, scaledObjectLatestYamlFile.name, scaledObjectLatestYaml, 'Scaled Object') sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) }) -test.serial('Latest Scale object should scale with new messages', t => { +test.serial('Latest Scale object should scale with new messages', async t => { for (let r = 1; r <= 3; r++) { sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(r, commandToCheckReplicas) - - t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`) + t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`) } }) @@ -209,37 +133,24 @@ test.serial('Cleanup after latest policy test', t=> { sh.exec(`sleep 10s`) }) -test.serial('Applying ScaledObject with multiple topics should scale up pods', t => { +test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => { // Make the consumer commit the all offsets for all topics in the group sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) - fs.writeFileSync(kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml) - t.is( - 0, - sh.exec(`kubectl apply -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Kafka application should work.' - ) + deployFromYaml(t, kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application') sh.exec(`sleep 5s`) - fs.writeFileSync(scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml) - - t.is( - 0, - sh.exec(`kubectl apply -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, - 'Deploying Scaled Object should work.' - ) + deployFromYaml(t, scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml, ' Scaled Object') sh.exec(`sleep 5s`) // when lag is 0, scaled object is not active, replica = 0 - waitForReplicaCount(0, commandToCheckReplicas) - t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`) // produce a single msg to the default topic // should turn scale object active, replica = 1 sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(1, commandToCheckReplicas) - t.is('1', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 1.') + t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`) // produce one more msg to the different topic within the same group // will turn total consumer group lag to 2. @@ -248,14 +159,12 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', t // as desiredReplicaCount = totalLag / avgThreshold sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(2, commandToCheckReplicas) - t.is('2', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 2.') + t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`) // make it 3 cause why not? sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) sh.exec(`sleep 5s`) - waitForReplicaCount(3, commandToCheckReplicas) - t.is('3', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 3.') + t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`) }) test.serial('Cleanup after multiple topics test', t=> { @@ -289,23 +198,31 @@ test.after.always('Clean up, delete created resources.', t => { ] for (const resource of resources) { - sh.exec(`kubectl delete ${resource} --namespace ${defaultNamespace}`) + sh.exec(`echo Deleting resource from file ${resource}`) + sh.exec(`kubectl delete -f ${resource} --namespace ${defaultNamespace}`) } + sh.exec(`echo Deleting namespace ${defaultNamespace}`) sh.exec(`kubectl delete namespace ${defaultNamespace}`) }) -const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: ${defaultCluster} namespace: ${defaultNamespace} spec: kafka: - version: 2.5.0 + version: "2.6.0" replicas: 3 listeners: - plain: {} - tls: {} + - name: plain + port: 9092 + type: internal + tls: false + - name: tls + port: 9093 + type: internal + tls: true config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 @@ -321,7 +238,7 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: ${defaultTopic} @@ -335,7 +252,7 @@ spec: retention.ms: 604800000 segment.bytes: 1073741824 --- -apiVersion: kafka.strimzi.io/v1beta1 +apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: ${defaultTopic2}