Skip to content

Commit

Permalink
Update kafka test to use strimzi operator v0.23.0
Browse files Browse the repository at this point in the history
Update to use strimzi operator to v0.23.0 and Kafka 2.6.0 in order to properly
work on Kubernetes 1.21 and up due to deprecation of beta CRD api
Also refactor common deploy and status checks to use internal methods

Signed-off-by: Ram Cohen <[email protected]>
  • Loading branch information
RamCohen committed Mar 23, 2022
1 parent 2ee5ea3 commit 41b5d81
Showing 1 changed file with 65 additions and 148 deletions.
213 changes: 65 additions & 148 deletions tests/scalers/kafka.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import * as fs from 'fs'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava';
import { createNamespace } from './helpers';
import test, { Assertions } from 'ava';
import { createNamespace, waitForDeploymentReplicaCount } from './helpers';

const defaultNamespace = 'kafka-test'
const defaultCluster = 'kafka-cluster'
const timeToWait = 300
const defaultTopic = 'kafka-topic'
const defaultTopic2 = 'kafka-topic-2'
const defaultKafkaClient = 'kafka-client'
const strimziOperatorVersion = '0.18.0'
const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`
const strimziOperatorVersion = '0.23.0'

const strimziOperatorYamlFile = tmp.fileSync()
const kafkaClusterYamlFile = tmp.fileSync()
Expand All @@ -24,123 +23,61 @@ const scaledObjectEarliestYamlFile = tmp.fileSync()
const scaledObjectLatestYamlFile = tmp.fileSync()
const scaledObjectMultipleTopicsYamlFile = tmp.fileSync()

test.before('Set up, create necessary resources.', t => {
sh.config.silent = true
function deployFromYaml(t: Assertions, filename: string, yaml: string, name: string) {
sh.exec(`echo Deploying ${name}`)
fs.writeFileSync(filename, yaml)
t.is(0, sh.exec(`kubectl apply -f ${filename} --namespace ${defaultNamespace}`).code, `Deploying ${name} should work.`)
}

function waitForReady(t: Assertions, app: string, name: string, condition: string = 'Ready') {
sh.exec(`echo Waiting for ${app} for ${timeToWait} seconds to be ${condition}`)
t.is(
0,
sh.exec(`kubectl wait ${app} --for=condition=${condition} --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
`${name} should be ready within given time limit.`
)
}

test.before('Set up, create necessary resources.', async t => {
createNamespace(defaultNamespace)

sh.config.silent = true
const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout
fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`))
t.is(
0,
sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Strimzi operator should work.'
)
sh.config.silent = false

fs.writeFileSync(kafkaClusterYamlFile.name, kafkaClusterYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${kafkaClusterYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka cluster instance should work.'
)
t.is(
0,
sh.exec(`kubectl wait kafka/${defaultCluster} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
'Kafka instance should be ready within given time limit.'
)
deployFromYaml(t, strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`), 'Strimzi operator')
deployFromYaml(t, kafkaClusterYamlFile.name, kafkaClusterYaml, 'Kafka cluster')
waitForReady(t, `kafka/${defaultCluster}`,'Kafka instance')

fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicsYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${kafkaTopicYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka topic should work.'
)
t.is(
0,
sh.exec(`kubectl wait kafkatopic/${defaultTopic} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
'Kafka topic should be ready withlanguage-mattersin given time limit.'
)
t.is(
0,
sh.exec(`kubectl wait kafkatopic/${defaultTopic2} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
'Kafka topic2 should be ready within given time limit.'
)
deployFromYaml(t, kafkaTopicYamlFile.name, kafkaTopicsYaml, 'Kafka topic')
waitForReady(t, `kafkatopic/${defaultTopic}`,'Kafka topic')
waitForReady(t, `kafkatopic/${defaultTopic2}`,'Kafka topic2')

fs.writeFileSync(kafkaClientYamlFile.name, kafkaClientYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${kafkaClientYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka client should work.'
)
t.is(
0,
sh.exec(`kubectl wait pod/${defaultKafkaClient} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
'Kafka client should be ready within given time limit.'
)
deployFromYaml(t, kafkaClientYamlFile.name, kafkaClientYaml, 'Kafka client')
waitForReady(t, `pod/${defaultKafkaClient}`,'Kafka client')

fs.writeFileSync(kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml)
deployFromYaml(t, kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml, 'Kafka application')
deployFromYaml(t, scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml, 'Scaled Object')
waitForReady(t, 'deployment/kafka-consumer','Kafka application', 'Available')

t.is(
0,
sh.exec(`kubectl apply -f ${kafkaApplicationEarliestYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka application should work.'
)
fs.writeFileSync(scaledObjectEarliestYamlFile.name, scaledObjectEarliestYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${scaledObjectEarliestYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Scaled Object should work.'
)
t.is(
0,
sh.exec(`kubectl wait deployment/kafka-consumer --for=condition=Available --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code,
'Kafka application should be ready within given time limit.'
)
waitForReplicaCount(0, commandToCheckReplicas)
t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.')
t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), 'replica count should start out as 0')
});

function waitForReplicaCount(desiredReplicaCount: number, commandToCheck: string) {
let replicaCount = undefined
let changed = undefined
for (let i = 0; i < 10; i++) {
changed = false
// checks the replica count 3 times, it tends to fluctuate from the beginning
for (let j = 0; j < 3; j++) {
replicaCount = sh.exec(commandToCheck).stdout
if (replicaCount === desiredReplicaCount.toString()) {
sh.exec('sleep 2s')
} else {
changed = true
break
}
}
if (changed === false) {
return
} else {
sh.exec('sleep 3s')
}
}
}

test.serial('Scale application with kafka messages.', t => {
test.serial('Scale application with kafka messages.', async t => {
for (let r = 1; r <= 3; r++) {

sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`)
sh.exec(`sleep 5s`)

waitForReplicaCount(r, commandToCheckReplicas)

t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`)
t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`)
}
})

test.serial('Scale application beyond partition max.', t => {
test.serial('Scale application beyond partition max.', async t => {
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`)
sh.exec(`sleep 5s`)

waitForReplicaCount(3, commandToCheckReplicas)

t.is('3', sh.exec(commandToCheckReplicas).stdout, `Replica count should be 3.`)
t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`)
})

test.serial('cleanup after earliest policy test', t=> {
Expand All @@ -158,40 +95,27 @@ test.serial('cleanup after earliest policy test', t=> {
sh.exec(`sleep 30s`)
})

test.serial('Applying ScaledObject latest policy should not scale up pods', t => {
test.serial('Applying ScaledObject latest policy should not scale up pods', async t => {

//Make the consumer commit the first offset for each partition.
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group latest --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`)

fs.writeFileSync(kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka application should work.'
)
deployFromYaml(t, kafkaApplicationLatestYamlFile.name, kafkaApplicationLatestYaml, 'Kafka application')
sh.exec(`sleep 10s`)
fs.writeFileSync(scaledObjectLatestYamlFile.name, scaledObjectLatestYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Scaled Object should work.'
)
deployFromYaml(t, scaledObjectLatestYamlFile.name, scaledObjectLatestYaml, 'Scaled Object')
sh.exec(`sleep 5s`)
waitForReplicaCount(1, commandToCheckReplicas)
t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.')
t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`)
})


test.serial('Latest Scale object should scale with new messages', t => {
test.serial('Latest Scale object should scale with new messages', async t => {

for (let r = 1; r <= 3; r++) {

sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`)
sh.exec(`sleep 5s`)

waitForReplicaCount(r, commandToCheckReplicas)

t.is(r.toString(), sh.exec(commandToCheckReplicas).stdout, `Replica count should be ${r}.`)
t.true(await waitForDeploymentReplicaCount(r, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be ${r}.`)
}
})

Expand All @@ -209,37 +133,24 @@ test.serial('Cleanup after latest policy test', t=> {
sh.exec(`sleep 10s`)
})

test.serial('Applying ScaledObject with multiple topics should scale up pods', t => {
test.serial('Applying ScaledObject with multiple topics should scale up pods', async t => {
// Make the consumer commit the all offsets for all topics in the group
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`)
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`)

fs.writeFileSync(kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml)
t.is(
0,
sh.exec(`kubectl apply -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Kafka application should work.'
)
deployFromYaml(t, kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml, 'Kafka application')
sh.exec(`sleep 5s`)
fs.writeFileSync(scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml)

t.is(
0,
sh.exec(`kubectl apply -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code,
'Deploying Scaled Object should work.'
)
deployFromYaml(t, scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml, ' Scaled Object')
sh.exec(`sleep 5s`)

// when lag is 0, scaled object is not active, replica = 0
waitForReplicaCount(0, commandToCheckReplicas)
t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.')
t.true(await waitForDeploymentReplicaCount(0, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 0.`)

// produce a single msg to the default topic
// should turn scale object active, replica = 1
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`)
sh.exec(`sleep 5s`)
waitForReplicaCount(1, commandToCheckReplicas)
t.is('1', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 1.')
t.true(await waitForDeploymentReplicaCount(1, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 1.`)

// produce one more msg to the different topic within the same group
// will turn total consumer group lag to 2.
Expand All @@ -248,14 +159,12 @@ test.serial('Applying ScaledObject with multiple topics should scale up pods', t
// as desiredReplicaCount = totalLag / avgThreshold
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`)
sh.exec(`sleep 5s`)
waitForReplicaCount(2, commandToCheckReplicas)
t.is('2', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 2.')
t.true(await waitForDeploymentReplicaCount(2, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 2.`)

// make it 3 cause why not?
sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`)
sh.exec(`sleep 5s`)
waitForReplicaCount(3, commandToCheckReplicas)
t.is('3', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 3.')
t.true(await waitForDeploymentReplicaCount(3, 'kafka-consumer', defaultNamespace, 30, 2000), `Replica count should be 3.`)
})

test.serial('Cleanup after multiple topics test', t=> {
Expand Down Expand Up @@ -289,23 +198,31 @@ test.after.always('Clean up, delete created resources.', t => {
]

for (const resource of resources) {
sh.exec(`kubectl delete ${resource} --namespace ${defaultNamespace}`)
sh.exec(`echo Deleting resource from file ${resource}`)
sh.exec(`kubectl delete -f ${resource} --namespace ${defaultNamespace}`)
}
sh.exec(`echo Deleting namespace ${defaultNamespace}`)
sh.exec(`kubectl delete namespace ${defaultNamespace}`)
})

const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta1
const kafkaClusterYaml = `apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: ${defaultCluster}
namespace: ${defaultNamespace}
spec:
kafka:
version: 2.5.0
version: "2.6.0"
replicas: 3
listeners:
plain: {}
tls: {}
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
Expand All @@ -321,7 +238,7 @@ spec:
topicOperator: {}
userOperator: {}`

const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta1
const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: ${defaultTopic}
Expand All @@ -335,7 +252,7 @@ spec:
retention.ms: 604800000
segment.bytes: 1073741824
---
apiVersion: kafka.strimzi.io/v1beta1
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: ${defaultTopic2}
Expand Down

0 comments on commit 41b5d81

Please sign in to comment.