From 2073a1c96ccd5c48e06f3a41db69e41984b77fb4 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Wed, 29 Sep 2021 13:39:33 +0100 Subject: [PATCH] Fix Flink job tracking --- config/rbac/role.yaml | 1 + controllers/flinkcluster_controller.go | 37 ++++++++------- controllers/flinkcluster_observer.go | 31 +++++++------ controllers/flinkcluster_util.go | 63 +++++++++++++++++--------- controllers/flinkcluster_util_test.go | 49 +++++++------------- go.mod | 2 +- main.go | 7 ++- 7 files changed, 105 insertions(+), 85 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index c6bc8d37..0db56756 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -125,6 +125,7 @@ rules: - "" resources: - pods/status + - pods/log verbs: - get - apiGroups: diff --git a/controllers/flinkcluster_controller.go b/controllers/flinkcluster_controller.go index 06885eee..e5f6e914 100644 --- a/controllers/flinkcluster_controller.go +++ b/controllers/flinkcluster_controller.go @@ -30,6 +30,7 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,9 +41,10 @@ var controllerKind = v1beta1.GroupVersion.WithKind("FlinkCluster") // FlinkClusterReconciler reconciles a FlinkCluster object type FlinkClusterReconciler struct { - Client client.Client - Log logr.Logger - Mgr ctrl.Manager + Client client.Client + Clientset *kubernetes.Clientset + Log logr.Logger + Mgr ctrl.Manager } // +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters,verbs=get;list;watch;create;update;patch;delete @@ -70,13 +72,14 @@ func (reconciler *FlinkClusterReconciler) Reconcile(ctx context.Context, var log = reconciler.Log.WithValues( "cluster", request.NamespacedName) var handler = FlinkClusterHandler{ - k8sClient: reconciler.Client, - flinkClient: flink.NewDefaultClient(log), - request: request, - context: context.Background(), - log: log, - recorder: reconciler.Mgr.GetEventRecorderFor("FlinkOperator"), - observed: ObservedClusterState{}, + k8sClient: reconciler.Client, + k8sClientset: reconciler.Clientset, + flinkClient: flink.NewDefaultClient(log), + request: request, + context: context.Background(), + log: log, + recorder: reconciler.Mgr.GetEventRecorderFor("FlinkOperator"), + observed: ObservedClusterState{}, } return handler.reconcile(ctx, request) } @@ -99,6 +102,7 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager( // reconcile request. type FlinkClusterHandler struct { k8sClient client.Client + k8sClientset *kubernetes.Clientset flinkClient *flink.Client request ctrl.Request context context.Context @@ -127,12 +131,13 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context, log.Info("---------- 1. Observe the current state ----------") var observer = ClusterStateObserver{ - k8sClient: k8sClient, - flinkClient: flinkClient, - request: request, - context: context, - log: log, - history: history, + k8sClient: k8sClient, + k8sClientset: handler.k8sClientset, + flinkClient: flinkClient, + request: request, + context: context, + log: log, + history: history, } err = observer.observe(observed) if err != nil { diff --git a/controllers/flinkcluster_observer.go b/controllers/flinkcluster_observer.go index e9791432..b4b2bdd1 100644 --- a/controllers/flinkcluster_observer.go +++ b/controllers/flinkcluster_observer.go @@ -33,18 +33,20 @@ import ( extensionsv1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) // ClusterStateObserver gets the observed state of the cluster. type ClusterStateObserver struct { - k8sClient client.Client - flinkClient *flink.Client - request ctrl.Request - context context.Context - log logr.Logger - history history.Interface + k8sClient client.Client + k8sClientset *kubernetes.Clientset + flinkClient *flink.Client + request ctrl.Request + context context.Context + log logr.Logger + history history.Interface } // ObservedClusterState holds observed state of a cluster. @@ -245,7 +247,7 @@ func (observer *ClusterStateObserver) observeJob( observed.job = observedJob // Get job submitter pod resource. - var observedJobPod *corev1.Pod = new(corev1.Pod) + observedJobPod := new(corev1.Pod) err = observer.observeJobPod(observedJobPod) if err != nil { log.Error(err, "Failed to get job pod") @@ -253,9 +255,9 @@ func (observer *ClusterStateObserver) observeJob( observed.jobPod = observedJobPod // Extract submit result. - observedFlinkJobSubmitLog, err = getFlinkJobSubmitLog(observedJobPod) + observedFlinkJobSubmitLog, err = getFlinkJobSubmitLog(observer.k8sClientset, observedJobPod) if err != nil { - log.Error(err, "Failed to extract job submit result") + log.Info("Failed to extract job submit result", "error", err.Error()) } observed.flinkJobSubmitLog = observedFlinkJobSubmitLog @@ -294,13 +296,16 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedCl flinkJobStatus.flinkJobList = flinkJobList // Check running jobs - if len(flinkJobList.Jobs) < 1 { + if len(flinkJobList.Jobs) < 1 && observed.flinkJobSubmitLog == nil { return } - flinkJobStatus.flinkJob = &flinkJobList.Jobs[0] - for _, job := range flinkJobList.Jobs[1:] { - flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id) + for _, job := range flinkJobList.Jobs { + if observed.flinkJobSubmitLog.JobID == job.Id { + flinkJobStatus.flinkJob = &job + } else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning { + flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id) + } } flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobStatus.flinkJob.Id) diff --git a/controllers/flinkcluster_util.go b/controllers/flinkcluster_util.go index 20c1a0ab..82bab4aa 100644 --- a/controllers/flinkcluster_util.go +++ b/controllers/flinkcluster_util.go @@ -18,16 +18,18 @@ package controllers import ( "bytes" + "context" "encoding/json" "fmt" + "io" "os" + "regexp" "strconv" "strings" "time" v1beta1 "github.com/spotify/flink-on-k8s-operator/api/v1beta1" "github.com/spotify/flink-on-k8s-operator/controllers/history" - "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -35,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" ) const ( @@ -52,6 +55,10 @@ const ( SavepointRequestRetryIntervalSec = 10 ) +var ( + jobIdRegexp = regexp.MustCompile("JobID (.*)\n") +) + type UpdateState string const ( @@ -556,29 +563,43 @@ func getFlinkJobDeploymentState(flinkJobState string) string { } } +func getPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) { + if pod == nil { + return "", fmt.Errorf("no job pod found, even though submission completed") + } + pods := clientset.CoreV1().Pods(pod.Namespace) + + req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{}) + podLogs, err := req.Stream(context.TODO()) + if err != nil { + return "", fmt.Errorf("Failed to get logs for pod %s: %v", pod.Name, err) + } + defer podLogs.Close() + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + if err != nil { + return "", fmt.Errorf("error in copy information from pod logs to buf") + } + str := buf.String() + + return str, nil +} + // getFlinkJobSubmitLog extract submit result from the pod termination log. -func getFlinkJobSubmitLog(observedPod *corev1.Pod) (*FlinkJobSubmitLog, error) { - if observedPod == nil { - return nil, fmt.Errorf("no job pod found, even though submission completed") - } - var containerStatuses = observedPod.Status.ContainerStatuses - if len(containerStatuses) == 0 || - containerStatuses[0].State.Terminated == nil || - containerStatuses[0].State.Terminated.Message == "" { - return nil, fmt.Errorf("job pod found, but no termination log found even though submission completed") - } - - // The job submission script writes the submission log to the pod termination log at the end of execution. - // If the job submission is successful, the extracted job ID is also included. - // The job submit script writes the submission result in YAML format, - // so parse it here to get the ID - if available - and log. - // Note: https://kubernetes.io/docs/tasks/debug-application-cluster/determine-reason-pod-failure/ - var rawJobSubmitResult = containerStatuses[0].State.Terminated.Message - var result = new(FlinkJobSubmitLog) - var err = yaml.Unmarshal([]byte(rawJobSubmitResult), result) +func getFlinkJobSubmitLog(clientset *kubernetes.Clientset, observedPod *corev1.Pod) (*FlinkJobSubmitLog, error) { + log, err := getPodLogs(clientset, observedPod) if err != nil { return nil, err } - return result, nil + return getFlinkJobSubmitLogFromString(log) +} + +func getFlinkJobSubmitLogFromString(podLog string) (*FlinkJobSubmitLog, error) { + if result := jobIdRegexp.FindStringSubmatch(podLog); len(result) > 0 { + return &FlinkJobSubmitLog{JobID: result[1], Message: podLog}, nil + } else { + return nil, fmt.Errorf("no job id found") + } } diff --git a/controllers/flinkcluster_util_test.go b/controllers/flinkcluster_util_test.go index 401334de..f3008d6c 100644 --- a/controllers/flinkcluster_util_test.go +++ b/controllers/flinkcluster_util_test.go @@ -576,52 +576,37 @@ func TestGetNonLiveHistory(t *testing.T) { assert.Equal(t, len(nonLiveHistory), 0) } -func TestGetFlinkJobDeploymentState(t *testing.T) { - var pod corev1.Pod +func TestGetFlinkJobSubmitLog(t *testing.T) { var submit, expected *FlinkJobSubmitLog var err error // success - termMsg := ` -jobID: ec74209eb4e3db8ae72db00bd7a830aa -message: | - Successfully submitted! + log := ` /opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt Starting execution of program Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa + Program execution finished + Job with JobID ec74209eb4e3db8ae72db00bd7a830aa has finished. + Job Runtime: 333688 ms ` expected = &FlinkJobSubmitLog{ JobID: "ec74209eb4e3db8ae72db00bd7a830aa", - Message: `Successfully submitted! -/opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt -Starting execution of program -Printing result to stdout. Use --output to specify output path. -Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa + Message: ` + /opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt + Starting execution of program + Printing result to stdout. Use --output to specify output path. + Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa + Program execution finished + Job with JobID ec74209eb4e3db8ae72db00bd7a830aa has finished. + Job Runtime: 333688 ms `, } - pod = corev1.Pod{ - Status: corev1.PodStatus{ - ContainerStatuses: []corev1.ContainerStatus{{ - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - Message: termMsg, - }}}}}} - submit, _ = getFlinkJobSubmitLog(&pod) + + submit, _ = getFlinkJobSubmitLogFromString(log) assert.DeepEqual(t, *submit, *expected) // failed: pod not found - _, err = getFlinkJobSubmitLog(nil) - assert.Error(t, err, "no job pod found, even though submission completed") - - // failed: message not found - pod = corev1.Pod{ - Status: corev1.PodStatus{ - ContainerStatuses: []corev1.ContainerStatus{{ - State: corev1.ContainerState{ - Terminated: &corev1.ContainerStateTerminated{ - Message: "", - }}}}}} - _, err = getFlinkJobSubmitLog(&pod) - assert.Error(t, err, "job pod found, but no termination log found even though submission completed") + _, err = getFlinkJobSubmitLogFromString("") + assert.Error(t, err, "no job id found") } diff --git a/go.mod b/go.mod index 7358d7ce..ec714ba9 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/onsi/gomega v1.16.0 golang.org/x/net v0.0.0-20210825183410-e898025ed96a golang.org/x/tools v0.1.5 // indirect - gopkg.in/yaml.v2 v2.4.0 gotest.tools v2.2.0+incompatible k8s.io/api v0.22.1 k8s.io/apimachinery v0.22.1 @@ -60,6 +59,7 @@ require ( google.golang.org/protobuf v1.26.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect k8s.io/apiextensions-apiserver v0.21.3 // indirect k8s.io/component-base v0.21.3 // indirect diff --git a/main.go b/main.go index c7b0d5ec..20bee8d3 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -84,9 +85,11 @@ func main() { os.Exit(1) } + cs, err := kubernetes.NewForConfig(mgr.GetConfig()) err = (&controllers.FlinkClusterReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"), + Client: mgr.GetClient(), + Clientset: cs, + Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"), }).SetupWithManager(mgr) if err != nil { setupLog.Error(err, "Unable to create controller", "controller", "FlinkCluster")