Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding FlinkDeployment CRD to supported third party resource customizations #5023

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
apiVersion: config.karmada.io/v1alpha1
kind: ResourceInterpreterCustomization
metadata:
name: declarative-configuration-flinkdeployment
spec:
target:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
customizations:
healthInterpretation:
luaScript: >
function InterpretHealth(observedObj)
if observedObj.status ~= nil and observedObj.status.jobStatus ~= nil then
return observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING'
mszacillo marked this conversation as resolved.
Show resolved Hide resolved
end
return false
end
replicaResource:
mszacillo marked this conversation as resolved.
Show resolved Hide resolved
luaScript: >
local kube = require("kube")

local function isempty(s)
return s == nil or s == ''
end

function GetReplicas(observedObj)
-- FlinkDeployments presently will not be subdivided among clusters, replica should be 1
replica = 1
requires = {
resourceRequest = {},
}
-- Add jobmanager resources into replica requirement

jm_replicas = observedObj.spec.jobManager.replicas
if isempty(jm_replicas) then
jm_replicas = 1
end

for i = 1, jm_replicas do
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.jobManager.resource.cpu))
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.jobManager.resource.memory)
end

-- Add task manager resources into replica requirement

parallelism = observedObj.spec.job.parallelism
tms = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])

for i = 1, tms do
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.taskManager.resource.cpu))
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.taskManager.resource.memory)
end

return replica, requires
end
mszacillo marked this conversation as resolved.
Show resolved Hide resolved
mszacillo marked this conversation as resolved.
Show resolved Hide resolved
statusAggregation:
luaScript: >
function AggregateStatus(desiredObj, statusItems)
if statusItems == nil then
return desiredObj
end
if desiredObj.status == nil then
desiredObj.status = {}
end
clusterInfo = {}
jobManagerDeploymentStatus = ''
jobStatus = {}
lifecycleState = ''
observedGeneration = 0
reconciliationStatus = {}
taskManager = {}

for i = 1, #statusItems do
currentStatus = statusItems[i].status
if currentStatus ~= nil then
clusterInfo = currentStatus.clusterInfo
jobManagerDeploymentStatus = currentStatus.jobManagerDeploymentStatus
jobStatus = currentStatus.jobStatus
observedGeneration = currentStatus.observedGeneration
lifecycleState = currentStatus.lifecycleState
reconciliationStatus = currentStatus.reconciliationStatus
taskManager = currentStatus.taskManager
end
end

desiredObj.status.clusterInfo = clusterInfo
desiredObj.status.jobManagerDeploymentStatus = jobManagerDeploymentStatus
desiredObj.status.jobStatus = jobStatus
desiredObj.status.lifecycleState = lifecycleState
desiredObj.status.observedGeneration = observedGeneration
desiredObj.status.reconciliationStatus = reconciliationStatus
desiredObj.status.taskManager = taskManager
return desiredObj
end
statusReflection:
luaScript: >
function ReflectStatus(observedObj)
status = {}
if observedObj == nil or observedObj.status == nil then
return status
end
status.clusterInfo = observedObj.status.clusterInfo
status.jobManagerDeploymentStatus = observedObj.status.jobManagerDeploymentStatus
status.jobStatus = observedObj.status.jobStatus
status.observedGeneration = observedObj.status.observedGeneration
status.lifecycleState = observedObj.status.lifecycleState
status.reconciliationStatus = observedObj.status.reconciliationStatus
status.taskManager = observedObj.status.taskManager
return status
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tests:
- desiredInputPath: testdata/desired-flinkdeployment.yaml
statusInputPath: testdata/status-file.yaml
operation: AggregateStatus
- observedInputPath: testdata/observed-flinkdeployment.yaml
operation: InterpretReplica
- observedInputPath: testdata/observed-flinkdeployment.yaml
operation: InterpretHealth
- observedInputPath: testdata/observed-flinkdeployment.yaml
operation: InterpretStatus
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
namespace: test-namespace
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
flinkVersion: v1_17
image: flink:1.17
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
mode: native
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 2048m
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
creationTimestamp: "2024-06-05T14:52:28Z"
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 1
name: basic-example
namespace: test-namespace
resourceVersion: "5053661"
uid: 87ef77ca-7bf0-4998-b275-06f459872e03
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
flinkVersion: v1_17
image: flink:1.17
job:
args: []
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
state: running
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 2048m
status:
clusterInfo:
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
flink-version: 1.17.1
total-cpu: "2.0"
total-memory: "4294967296"
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: 44cc5573945d1d4925732d915c70b9ac
jobName: Minimal Spec Example
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1717599166365"
state: RUNNING
updateTime: "1717599182544"
lifecycleState: STABLE
observedGeneration: 1
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1717599148930
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
applied: true
clusterName: member1
health: Healthy
status:
clusterInfo:
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
flink-version: 1.17.1
total-cpu: "2.0"
total-memory: "4294967296"
jobManagerDeploymentStatus: READY
jobStatus:
checkpointInfo:
lastPeriodicCheckpointTimestamp: 0
jobId: 44cc5573945d1d4925732d915c70b9ac
jobName: Minimal Spec Example
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
startTime: "1717599166365"
state: RUNNING
updateTime: "1717599182544"
lifecycleState: STABLE
observedGeneration: 1
reconciliationStatus:
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1717599148930
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1