Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flink job tracking #120

Merged
merged 1 commit into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ rules:
- ""
resources:
- pods/status
- pods/log
verbs:
- get
- apiGroups:
Expand Down
37 changes: 21 additions & 16 deletions controllers/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,9 +41,10 @@ var controllerKind = v1beta1.GroupVersion.WithKind("FlinkCluster")

// FlinkClusterReconciler reconciles a FlinkCluster object
type FlinkClusterReconciler struct {
Client client.Client
Log logr.Logger
Mgr ctrl.Manager
Client client.Client
Clientset *kubernetes.Clientset
Log logr.Logger
Mgr ctrl.Manager
}

// +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -70,13 +72,14 @@ func (reconciler *FlinkClusterReconciler) Reconcile(ctx context.Context,
var log = reconciler.Log.WithValues(
"cluster", request.NamespacedName)
var handler = FlinkClusterHandler{
k8sClient: reconciler.Client,
flinkClient: flink.NewDefaultClient(log),
request: request,
context: context.Background(),
log: log,
recorder: reconciler.Mgr.GetEventRecorderFor("FlinkOperator"),
observed: ObservedClusterState{},
k8sClient: reconciler.Client,
k8sClientset: reconciler.Clientset,
flinkClient: flink.NewDefaultClient(log),
request: request,
context: context.Background(),
log: log,
recorder: reconciler.Mgr.GetEventRecorderFor("FlinkOperator"),
observed: ObservedClusterState{},
}
return handler.reconcile(ctx, request)
}
Expand All @@ -99,6 +102,7 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager(
// reconcile request.
type FlinkClusterHandler struct {
k8sClient client.Client
k8sClientset *kubernetes.Clientset
flinkClient *flink.Client
request ctrl.Request
context context.Context
Expand Down Expand Up @@ -127,12 +131,13 @@ func (handler *FlinkClusterHandler) reconcile(ctx context.Context,
log.Info("---------- 1. Observe the current state ----------")

var observer = ClusterStateObserver{
k8sClient: k8sClient,
flinkClient: flinkClient,
request: request,
context: context,
log: log,
history: history,
k8sClient: k8sClient,
k8sClientset: handler.k8sClientset,
flinkClient: flinkClient,
request: request,
context: context,
log: log,
history: history,
}
err = observer.observe(observed)
if err != nil {
Expand Down
31 changes: 18 additions & 13 deletions controllers/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ClusterStateObserver gets the observed state of the cluster.
type ClusterStateObserver struct {
k8sClient client.Client
flinkClient *flink.Client
request ctrl.Request
context context.Context
log logr.Logger
history history.Interface
k8sClient client.Client
k8sClientset *kubernetes.Clientset
flinkClient *flink.Client
request ctrl.Request
context context.Context
log logr.Logger
history history.Interface
}

// ObservedClusterState holds observed state of a cluster.
Expand Down Expand Up @@ -245,17 +247,17 @@ func (observer *ClusterStateObserver) observeJob(
observed.job = observedJob

// Get job submitter pod resource.
var observedJobPod *corev1.Pod = new(corev1.Pod)
observedJobPod := new(corev1.Pod)
err = observer.observeJobPod(observedJobPod)
if err != nil {
log.Error(err, "Failed to get job pod")
}
observed.jobPod = observedJobPod

// Extract submit result.
observedFlinkJobSubmitLog, err = getFlinkJobSubmitLog(observedJobPod)
observedFlinkJobSubmitLog, err = getFlinkJobSubmitLog(observer.k8sClientset, observedJobPod)
if err != nil {
log.Error(err, "Failed to extract job submit result")
log.Info("Failed to extract job submit result", "error", err.Error())
}
observed.flinkJobSubmitLog = observedFlinkJobSubmitLog

Expand Down Expand Up @@ -294,13 +296,16 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedCl
flinkJobStatus.flinkJobList = flinkJobList

// Check running jobs
if len(flinkJobList.Jobs) < 1 {
if len(flinkJobList.Jobs) < 1 && observed.flinkJobSubmitLog == nil {
return
}

flinkJobStatus.flinkJob = &flinkJobList.Jobs[0]
for _, job := range flinkJobList.Jobs[1:] {
flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id)
for _, job := range flinkJobList.Jobs {
if observed.flinkJobSubmitLog.JobID == job.Id {
flinkJobStatus.flinkJob = &job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@regadas null pointer exception can happen here when the job pod is lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! indeed it misses a check previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this should be fine because this is not referring to the job pod; Also if the job pod is lost then we don't have submit log which is checked previously.

} else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning {
flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id)
}
}

flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobStatus.flinkJob.Id)
Expand Down
63 changes: 42 additions & 21 deletions controllers/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ package controllers

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"
"time"

v1beta1 "github.com/spotify/flink-on-k8s-operator/api/v1beta1"
"github.com/spotify/flink-on-k8s-operator/controllers/history"
"gopkg.in/yaml.v2"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
)

const (
Expand All @@ -52,6 +55,10 @@ const (
SavepointRequestRetryIntervalSec = 10
)

var (
jobIdRegexp = regexp.MustCompile("JobID (.*)\n")
)

type UpdateState string

const (
Expand Down Expand Up @@ -556,29 +563,43 @@ func getFlinkJobDeploymentState(flinkJobState string) string {
}
}

func getPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) {
if pod == nil {
return "", fmt.Errorf("no job pod found, even though submission completed")
}
pods := clientset.CoreV1().Pods(pod.Namespace)

req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "", fmt.Errorf("Failed to get logs for pod %s: %v", pod.Name, err)
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", fmt.Errorf("error in copy information from pod logs to buf")
}
str := buf.String()

return str, nil
}

// getFlinkJobSubmitLog extract submit result from the pod termination log.
func getFlinkJobSubmitLog(observedPod *corev1.Pod) (*FlinkJobSubmitLog, error) {
if observedPod == nil {
return nil, fmt.Errorf("no job pod found, even though submission completed")
}
var containerStatuses = observedPod.Status.ContainerStatuses
if len(containerStatuses) == 0 ||
containerStatuses[0].State.Terminated == nil ||
containerStatuses[0].State.Terminated.Message == "" {
return nil, fmt.Errorf("job pod found, but no termination log found even though submission completed")
}

// The job submission script writes the submission log to the pod termination log at the end of execution.
// If the job submission is successful, the extracted job ID is also included.
// The job submit script writes the submission result in YAML format,
// so parse it here to get the ID - if available - and log.
// Note: https://kubernetes.io/docs/tasks/debug-application-cluster/determine-reason-pod-failure/
var rawJobSubmitResult = containerStatuses[0].State.Terminated.Message
var result = new(FlinkJobSubmitLog)
var err = yaml.Unmarshal([]byte(rawJobSubmitResult), result)
func getFlinkJobSubmitLog(clientset *kubernetes.Clientset, observedPod *corev1.Pod) (*FlinkJobSubmitLog, error) {
log, err := getPodLogs(clientset, observedPod)
if err != nil {
return nil, err
}

return result, nil
return getFlinkJobSubmitLogFromString(log)
}

func getFlinkJobSubmitLogFromString(podLog string) (*FlinkJobSubmitLog, error) {
if result := jobIdRegexp.FindStringSubmatch(podLog); len(result) > 0 {
return &FlinkJobSubmitLog{JobID: result[1], Message: podLog}, nil
} else {
return nil, fmt.Errorf("no job id found")
}
}
49 changes: 17 additions & 32 deletions controllers/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,52 +576,37 @@ func TestGetNonLiveHistory(t *testing.T) {
assert.Equal(t, len(nonLiveHistory), 0)
}

func TestGetFlinkJobDeploymentState(t *testing.T) {
var pod corev1.Pod
func TestGetFlinkJobSubmitLog(t *testing.T) {
var submit, expected *FlinkJobSubmitLog
var err error

// success
termMsg := `
jobID: ec74209eb4e3db8ae72db00bd7a830aa
message: |
Successfully submitted!
log := `
/opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
Program execution finished
Job with JobID ec74209eb4e3db8ae72db00bd7a830aa has finished.
Job Runtime: 333688 ms
`
expected = &FlinkJobSubmitLog{
JobID: "ec74209eb4e3db8ae72db00bd7a830aa",
Message: `Successfully submitted!
/opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
Message: `
/opt/flink/bin/flink run --jobmanager flinkjobcluster-sample-jobmanager:8081 --class org.apache.flink.streaming.examples.wordcount.WordCount --parallelism 2 --detached ./examples/streaming/WordCount.jar --input ./README.txt
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID ec74209eb4e3db8ae72db00bd7a830aa
Program execution finished
Job with JobID ec74209eb4e3db8ae72db00bd7a830aa has finished.
Job Runtime: 333688 ms
`,
}
pod = corev1.Pod{
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{{
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
Message: termMsg,
}}}}}}
submit, _ = getFlinkJobSubmitLog(&pod)

submit, _ = getFlinkJobSubmitLogFromString(log)
assert.DeepEqual(t, *submit, *expected)

// failed: pod not found
_, err = getFlinkJobSubmitLog(nil)
assert.Error(t, err, "no job pod found, even though submission completed")

// failed: message not found
pod = corev1.Pod{
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{{
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
Message: "",
}}}}}}
_, err = getFlinkJobSubmitLog(&pod)
assert.Error(t, err, "job pod found, but no termination log found even though submission completed")
_, err = getFlinkJobSubmitLogFromString("")
assert.Error(t, err, "no job id found")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/onsi/gomega v1.16.0
golang.org/x/net v0.0.0-20210825183410-e898025ed96a
golang.org/x/tools v0.1.5 // indirect
gopkg.in/yaml.v2 v2.4.0
gotest.tools v2.2.0+incompatible
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
Expand Down Expand Up @@ -60,6 +59,7 @@ require (
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/apiextensions-apiserver v0.21.3 // indirect
k8s.io/component-base v0.21.3 // indirect
Expand Down
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -84,9 +85,11 @@ func main() {
os.Exit(1)
}

cs, err := kubernetes.NewForConfig(mgr.GetConfig())
err = (&controllers.FlinkClusterReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"),
Client: mgr.GetClient(),
Clientset: cs,
Log: ctrl.Log.WithName("controllers").WithName("FlinkCluster"),
}).SetupWithManager(mgr)
if err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "FlinkCluster")
Expand Down