diff --git a/pkg/skaffold/actions/k8sjob/task.go b/pkg/skaffold/actions/k8sjob/task.go index 751e79c3c6c..f3d57f0176e 100644 --- a/pkg/skaffold/actions/k8sjob/task.go +++ b/pkg/skaffold/actions/k8sjob/task.go @@ -20,29 +20,24 @@ import ( "context" "fmt" "io" - "time" "github.com/pkg/errors" "golang.org/x/sync/errgroup" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" apiwatch "k8s.io/apimachinery/pkg/watch" typesbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "github.com/GoogleContainerTools/skaffold/v2/pkg/diag/validator" "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/deploy/kubectl" "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/graph" + k8sjobutil "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/k8sjob" "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/k8sjob/tracker" kubernetesclient "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/kubernetes/client" "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/schema/latest" - "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/util" ) -type checkK8sRetryableErr func(error) bool - type Task struct { // Unique task name, used as the container name. name string @@ -62,9 +57,6 @@ type Task struct { // Manifest objecto use to deploy the k8s job. jobManifest batchv1.Job - // Slice of functions that check if a given k8s error is a retryable error or not. - retryableErrChecks []checkK8sRetryableErr - // Global env variables to be injected into the pod. envVars []corev1.EnvVar @@ -82,11 +74,6 @@ func NewTask(c latest.VerifyContainer, kubectl *kubectl.CLI, namespace string, a jobManifest: jobManifest, envVars: execEnv.envVars, execEnv: execEnv, - retryableErrChecks: []checkK8sRetryableErr{ - apierrs.IsServerTimeout, - apierrs.IsTimeout, - apierrs.IsTooManyRequests, - }, } } @@ -103,7 +90,7 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error { c := t.getContainerToDeploy() t.setManifestValues(c) - if err := t.deleteJob(ctx, t.jobManifest.Name, jm); err != nil { + if err := k8sjobutil.ForceJobDelete(ctx, t.jobManifest.Name, jm, t.kubectl); err != nil { return errors.Wrap(err, fmt.Sprintf("preparing job %v for execution", t.jobManifest.Name)) } @@ -117,7 +104,8 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error { } if err = t.watchStatus(ctx, t.jobManifest, jm); err != nil { - t.deleteJob(context.TODO(), t.jobManifest.Name, jm) + t.execEnv.logger.CancelJobLogger(t.jobManifest.Name) + k8sjobutil.ForceJobDelete(context.TODO(), t.jobManifest.Name, jm, t.kubectl) } return err @@ -129,7 +117,7 @@ func (t Task) Cleanup(ctx context.Context, out io.Writer) error { return err } - return t.deleteJob(ctx, t.Name(), jm) + return k8sjobutil.ForceJobDelete(ctx, t.Name(), jm, t.kubectl) } func (t Task) jobsManager() (typesbatchv1.JobInterface, error) { @@ -164,50 +152,12 @@ func (t *Task) setManifestValues(c corev1.Container) { } func (t Task) deployJob(ctx context.Context, jobManifest batchv1.Job, jobsManager typesbatchv1.JobInterface) error { - return t.withRetryablePoll(ctx, func(ctx context.Context) error { + return k8sjobutil.WithRetryablePoll(ctx, func(ctx context.Context) error { _, err := jobsManager.Create(ctx, &jobManifest, v1.CreateOptions{}) return err }) } -func (t Task) deleteJob(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface) error { - err := t.withRetryablePoll(ctx, func(ctx context.Context) error { - _, err := jobsManager.Get(ctx, jobName, v1.GetOptions{}) - return err - }) - - if apierrs.IsNotFound(err) { - return nil - } - - if err != nil { - return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName)) - } - - if err = t.deleteJobPod(ctx, jobName); err != nil { - return err - } - - err = t.withRetryablePoll(ctx, func(ctx context.Context) error { - return jobsManager.Delete(ctx, jobName, v1.DeleteOptions{ - GracePeriodSeconds: util.Ptr[int64](0), - PropagationPolicy: util.Ptr(v1.DeletePropagationForeground), - }) - }) - - if apierrs.IsNotFound(err) { - err = nil - } - - return err -} - -func (t Task) deleteJobPod(ctx context.Context, jobName string) error { - // We execute the Pods delete with the kubectl CLI client to be able to force the deletion. - _, err := t.kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName)) - return err -} - func (t Task) watchStatus(ctx context.Context, jobManifest batchv1.Job, jobsManager typesbatchv1.JobInterface) error { g, gCtx := errgroup.WithContext(ctx) withCancel, cancel := context.WithCancel(gCtx) @@ -311,7 +261,7 @@ func (t Task) checkIsPullImgErr(waitingReason string) bool { func (t Task) isJobErr(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface) bool { var jobState *batchv1.Job - err := t.withRetryablePoll(ctx, func(ctx context.Context) error { + err := k8sjobutil.WithRetryablePoll(ctx, func(ctx context.Context) error { job, err := jobsManager.Get(ctx, jobName, v1.GetOptions{}) jobState = job return err @@ -323,22 +273,3 @@ func (t Task) isJobErr(ctx context.Context, jobName string, jobsManager typesbat return jobState.Status.Failed > 0 } - -func (t Task) withRetryablePoll(ctx context.Context, execF func(context.Context) error) error { - return wait.PollImmediateWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(ctx context.Context) (bool, error) { - err := execF(ctx) - if t.isRetryableErr(err) { - return false, nil - } - - return true, err - }) -} - -func (t Task) isRetryableErr(k8sErr error) bool { - isRetryable := false - for _, checkIsRetryableErr := range t.retryableErrChecks { - isRetryable = isRetryable || checkIsRetryableErr(k8sErr) - } - return isRetryable -} diff --git a/pkg/skaffold/k8sjob/logger/log.go b/pkg/skaffold/k8sjob/logger/log.go index 4008c4248d8..a1800550de8 100644 --- a/pkg/skaffold/k8sjob/logger/log.go +++ b/pkg/skaffold/k8sjob/logger/log.go @@ -50,9 +50,7 @@ type Logger struct { muted int32 kubeContext string // Map to store cancel functions per each job. - jobLoggerCancelers map[string]func() - // Function to cancel any in progress logger through a context. - cancelThreadLoggers func() + jobLoggerCancelers sync.Map } type AtomicBool struct{ flag int32 } @@ -78,7 +76,6 @@ func NewLogger(ctx context.Context, tracker *tracker.JobTracker, labeller *label kubeContext: kubeContext, tracker: tracker, labeller: labeller, - jobLoggerCancelers: make(map[string]func()), } } @@ -126,9 +123,6 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error { } l.out = out - allCancelCtx, allCancel := context.WithCancel(ctx) - l.cancelThreadLoggers = allCancel - go func() { for { select { @@ -136,8 +130,8 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error { return case info := <-l.tracker.Notifier(): id, namespace := info[0], info[1] - jobCancelCtx, jobLogCancel := context.WithCancel(allCancelCtx) - l.jobLoggerCancelers[id] = jobLogCancel + jobCancelCtx, jobLogCancel := context.WithCancel(ctx) + l.jobLoggerCancelers.Store(id, jobLogCancel) go l.streamLogsFromKubernetesJob(jobCancelCtx, id, namespace, false) } } @@ -230,8 +224,6 @@ func (l *Logger) Stop() { return } l.childThreadEmitLogs.Set(false) - // Cancel any in progress logger from previous created threads. - l.cancelThreadLoggers() l.wg.Wait() l.hadLogsOutput.Range(func(key, value interface{}) bool { @@ -278,7 +270,7 @@ func (l *Logger) SetSince(time.Time) { } func (l *Logger) CancelJobLogger(jobID string) { - if cancelJobLogger, found := l.jobLoggerCancelers[jobID]; found { - cancelJobLogger() + if cancelJobLogger, found := l.jobLoggerCancelers.Load(jobID); found { + cancelJobLogger.(context.CancelFunc)() } } diff --git a/pkg/skaffold/k8sjob/util.go b/pkg/skaffold/k8sjob/util.go index 26abe1d57d9..66941047d41 100644 --- a/pkg/skaffold/k8sjob/util.go +++ b/pkg/skaffold/k8sjob/util.go @@ -17,15 +17,35 @@ limitations under the License. package k8sjob import ( + "context" "fmt" "io/ioutil" + "time" jsonpatch "github.com/evanphx/json-patch" + "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + typesbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/kubectl/pkg/scheme" + + "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/deploy/kubectl" + "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/util" +) + +type checkK8sRetryableErr func(error) bool + +var ( + // Slice of functions that check if a given k8s error is a retryable error or not. + retryableErrChecks []checkK8sRetryableErr = []checkK8sRetryableErr{ + apierrs.IsServerTimeout, + apierrs.IsTimeout, + apierrs.IsTooManyRequests, + } ) func ApplyOverrides(obj runtime.Object, overrides string) (runtime.Object, error) { @@ -112,3 +132,57 @@ func GetGenericJob() *batchv1.Job { }, } } + +func ForceJobDelete(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface, kubectl *kubectl.CLI) error { + err := WithRetryablePoll(ctx, func(ctx context.Context) error { + _, err := jobsManager.Get(ctx, jobName, metav1.GetOptions{}) + return err + }) + + if apierrs.IsNotFound(err) { + return nil + } + + if err != nil { + return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName)) + } + err = WithRetryablePoll(ctx, func(ctx context.Context) error { + return jobsManager.Delete(ctx, jobName, metav1.DeleteOptions{ + GracePeriodSeconds: util.Ptr[int64](0), + PropagationPolicy: util.Ptr(metav1.DeletePropagationForeground), + }) + }) + + if err != nil && !apierrs.IsNotFound(err) { + return err + } + + err = deleteJobPod(ctx, jobName, kubectl) + + return err +} + +func deleteJobPod(ctx context.Context, jobName string, kubectl *kubectl.CLI) error { + // We execute the Pods delete with the kubectl CLI client to be able to force the deletion. + _, err := kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName)) + return err +} + +func WithRetryablePoll(ctx context.Context, execF func(context.Context) error) error { + return wait.PollImmediateWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(ctx context.Context) (bool, error) { + err := execF(ctx) + if isRetryableErr(err) { + return false, nil + } + + return true, err + }) +} + +func isRetryableErr(k8sErr error) bool { + isRetryable := false + for _, checkIsRetryableErr := range retryableErrChecks { + isRetryable = isRetryable || checkIsRetryableErr(k8sErr) + } + return isRetryable +} diff --git a/pkg/skaffold/verify/k8sjob/verify.go b/pkg/skaffold/verify/k8sjob/verify.go index c1ecf6a9741..5b65cb6644c 100644 --- a/pkg/skaffold/verify/k8sjob/verify.go +++ b/pkg/skaffold/verify/k8sjob/verify.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/otel/trace" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" @@ -251,7 +250,7 @@ func (v *Verifier) createAndRunJob(ctx context.Context, tc latest.VerifyTestCase case <-v.timeout(timeoutDuration): execErr = errors.New(fmt.Sprintf("%q running k8s job timed out after : %v", tc.Name, *timeoutDuration)) v.logger.CancelJobLogger(job.Name) - if err := v.forceJobDelete(ctx, clientset, job); err != nil { + if err := k8sjobutil.ForceJobDelete(ctx, job.Name, clientset.BatchV1().Jobs(job.Namespace), &v.kubectl); err != nil { execErr = errors.Wrap(execErr, err.Error()) } eventV2.VerifyFailed(tc.Name, execErr) @@ -316,7 +315,8 @@ func (v *Verifier) Cleanup(ctx context.Context, out io.Writer, dryRun bool) erro for _, job := range v.tracker.DeployedJobs() { // assumes the job namespace is set and not "" which is the case as createJob // & createJobFromManifestPath set the namespace in the created Job - if err := v.forceJobDelete(ctx, clientset, job); err != nil { + namespace := job.Namespace + if err := k8sjobutil.ForceJobDelete(ctx, job.Name, clientset.BatchV1().Jobs(namespace), &v.kubectl); err != nil { // TODO(aaron-prindle): replace with actionable error return errors.Wrap(err, "cleaning up deployed job") } @@ -324,40 +324,6 @@ func (v *Verifier) Cleanup(ctx context.Context, out io.Writer, dryRun bool) erro return nil } -func (v *Verifier) forceJobDelete(ctx context.Context, clientset k8sclient.Interface, job *batchv1.Job) error { - _, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) - - if apierrs.IsNotFound(err) { - return nil - } - - if err != nil { - return errors.Wrap(err, fmt.Sprintf("deleting %v job", job.Name)) - } - - if err = v.deleteJobPod(ctx, job.Name); err != nil { - return err - } - - if err = v.deleteJob(ctx, clientset, job.Name, job.Namespace); apierrs.IsNotFound(err) { - err = nil - } - - return err -} - -func (v *Verifier) deleteJob(ctx context.Context, clientset k8sclient.Interface, jobName, namespace string) error { - return clientset.BatchV1().Jobs(namespace).Delete(ctx, jobName, metav1.DeleteOptions{ - PropagationPolicy: util.Ptr(metav1.DeletePropagationForeground), - }) -} - -func (v *Verifier) deleteJobPod(ctx context.Context, jobName string) error { - // We execute the Pods delete with the kubectl CLI client to be able to force the deletion. - _, err := v.kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName)) - return err -} - // Dependencies lists all the files that describe what needs to be verified. func (v *Verifier) Dependencies() ([]string, error) { return []string{}, nil