Skip to content

Commit

Permalink
feat: move delete job logic to k8sjob utils and make k8s custom actio…
Browse files Browse the repository at this point in the history
…ns to stop log for failed actions
  • Loading branch information
renzodavid9 committed May 19, 2023
1 parent ef227c4 commit c65794c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 126 deletions.
83 changes: 7 additions & 76 deletions pkg/skaffold/actions/k8sjob/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,24 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
apiwatch "k8s.io/apimachinery/pkg/watch"
typesbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"

"github.com/GoogleContainerTools/skaffold/v2/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/deploy/kubectl"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/graph"
k8sjobutil "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/k8sjob"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/k8sjob/tracker"
kubernetesclient "github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/kubernetes/client"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/util"
)

type checkK8sRetryableErr func(error) bool

type Task struct {
// Unique task name, used as the container name.
name string
Expand All @@ -62,9 +57,6 @@ type Task struct {
// Manifest objecto use to deploy the k8s job.
jobManifest batchv1.Job

// Slice of functions that check if a given k8s error is a retryable error or not.
retryableErrChecks []checkK8sRetryableErr

// Global env variables to be injected into the pod.
envVars []corev1.EnvVar

Expand All @@ -82,11 +74,6 @@ func NewTask(c latest.VerifyContainer, kubectl *kubectl.CLI, namespace string, a
jobManifest: jobManifest,
envVars: execEnv.envVars,
execEnv: execEnv,
retryableErrChecks: []checkK8sRetryableErr{
apierrs.IsServerTimeout,
apierrs.IsTimeout,
apierrs.IsTooManyRequests,
},
}
}

Expand All @@ -103,7 +90,7 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error {
c := t.getContainerToDeploy()
t.setManifestValues(c)

if err := t.deleteJob(ctx, t.jobManifest.Name, jm); err != nil {
if err := k8sjobutil.ForceJobDelete(ctx, t.jobManifest.Name, jm, t.kubectl); err != nil {
return errors.Wrap(err, fmt.Sprintf("preparing job %v for execution", t.jobManifest.Name))
}

Expand All @@ -117,7 +104,8 @@ func (t Task) Exec(ctx context.Context, out io.Writer) error {
}

if err = t.watchStatus(ctx, t.jobManifest, jm); err != nil {
t.deleteJob(context.TODO(), t.jobManifest.Name, jm)
t.execEnv.logger.CancelJobLogger(t.jobManifest.Name)
k8sjobutil.ForceJobDelete(context.TODO(), t.jobManifest.Name, jm, t.kubectl)
}

return err
Expand All @@ -129,7 +117,7 @@ func (t Task) Cleanup(ctx context.Context, out io.Writer) error {
return err
}

return t.deleteJob(ctx, t.Name(), jm)
return k8sjobutil.ForceJobDelete(ctx, t.Name(), jm, t.kubectl)
}

func (t Task) jobsManager() (typesbatchv1.JobInterface, error) {
Expand Down Expand Up @@ -164,50 +152,12 @@ func (t *Task) setManifestValues(c corev1.Container) {
}

func (t Task) deployJob(ctx context.Context, jobManifest batchv1.Job, jobsManager typesbatchv1.JobInterface) error {
return t.withRetryablePoll(ctx, func(ctx context.Context) error {
return k8sjobutil.WithRetryablePoll(ctx, func(ctx context.Context) error {
_, err := jobsManager.Create(ctx, &jobManifest, v1.CreateOptions{})
return err
})
}

func (t Task) deleteJob(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface) error {
err := t.withRetryablePoll(ctx, func(ctx context.Context) error {
_, err := jobsManager.Get(ctx, jobName, v1.GetOptions{})
return err
})

if apierrs.IsNotFound(err) {
return nil
}

if err != nil {
return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName))
}

if err = t.deleteJobPod(ctx, jobName); err != nil {
return err
}

err = t.withRetryablePoll(ctx, func(ctx context.Context) error {
return jobsManager.Delete(ctx, jobName, v1.DeleteOptions{
GracePeriodSeconds: util.Ptr[int64](0),
PropagationPolicy: util.Ptr(v1.DeletePropagationForeground),
})
})

if apierrs.IsNotFound(err) {
err = nil
}

return err
}

func (t Task) deleteJobPod(ctx context.Context, jobName string) error {
// We execute the Pods delete with the kubectl CLI client to be able to force the deletion.
_, err := t.kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName))
return err
}

func (t Task) watchStatus(ctx context.Context, jobManifest batchv1.Job, jobsManager typesbatchv1.JobInterface) error {
g, gCtx := errgroup.WithContext(ctx)
withCancel, cancel := context.WithCancel(gCtx)
Expand Down Expand Up @@ -311,7 +261,7 @@ func (t Task) checkIsPullImgErr(waitingReason string) bool {

func (t Task) isJobErr(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface) bool {
var jobState *batchv1.Job
err := t.withRetryablePoll(ctx, func(ctx context.Context) error {
err := k8sjobutil.WithRetryablePoll(ctx, func(ctx context.Context) error {
job, err := jobsManager.Get(ctx, jobName, v1.GetOptions{})
jobState = job
return err
Expand All @@ -323,22 +273,3 @@ func (t Task) isJobErr(ctx context.Context, jobName string, jobsManager typesbat

return jobState.Status.Failed > 0
}

func (t Task) withRetryablePoll(ctx context.Context, execF func(context.Context) error) error {
return wait.PollImmediateWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(ctx context.Context) (bool, error) {
err := execF(ctx)
if t.isRetryableErr(err) {
return false, nil
}

return true, err
})
}

func (t Task) isRetryableErr(k8sErr error) bool {
isRetryable := false
for _, checkIsRetryableErr := range t.retryableErrChecks {
isRetryable = isRetryable || checkIsRetryableErr(k8sErr)
}
return isRetryable
}
18 changes: 5 additions & 13 deletions pkg/skaffold/k8sjob/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ type Logger struct {
muted int32
kubeContext string
// Map to store cancel functions per each job.
jobLoggerCancelers map[string]func()
// Function to cancel any in progress logger through a context.
cancelThreadLoggers func()
jobLoggerCancelers sync.Map
}

type AtomicBool struct{ flag int32 }
Expand All @@ -78,7 +76,6 @@ func NewLogger(ctx context.Context, tracker *tracker.JobTracker, labeller *label
kubeContext: kubeContext,
tracker: tracker,
labeller: labeller,
jobLoggerCancelers: make(map[string]func()),
}
}

Expand Down Expand Up @@ -126,18 +123,15 @@ func (l *Logger) Start(ctx context.Context, out io.Writer) error {
}
l.out = out

allCancelCtx, allCancel := context.WithCancel(ctx)
l.cancelThreadLoggers = allCancel

go func() {
for {
select {
case <-ctx.Done():
return
case info := <-l.tracker.Notifier():
id, namespace := info[0], info[1]
jobCancelCtx, jobLogCancel := context.WithCancel(allCancelCtx)
l.jobLoggerCancelers[id] = jobLogCancel
jobCancelCtx, jobLogCancel := context.WithCancel(ctx)
l.jobLoggerCancelers.Store(id, jobLogCancel)
go l.streamLogsFromKubernetesJob(jobCancelCtx, id, namespace, false)
}
}
Expand Down Expand Up @@ -230,8 +224,6 @@ func (l *Logger) Stop() {
return
}
l.childThreadEmitLogs.Set(false)
// Cancel any in progress logger from previous created threads.
l.cancelThreadLoggers()
l.wg.Wait()

l.hadLogsOutput.Range(func(key, value interface{}) bool {
Expand Down Expand Up @@ -278,7 +270,7 @@ func (l *Logger) SetSince(time.Time) {
}

func (l *Logger) CancelJobLogger(jobID string) {
if cancelJobLogger, found := l.jobLoggerCancelers[jobID]; found {
cancelJobLogger()
if cancelJobLogger, found := l.jobLoggerCancelers.Load(jobID); found {
cancelJobLogger.(context.CancelFunc)()
}
}
74 changes: 74 additions & 0 deletions pkg/skaffold/k8sjob/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,35 @@ limitations under the License.
package k8sjob

import (
"context"
"fmt"
"io/ioutil"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typesbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/kubectl/pkg/scheme"

"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/deploy/kubectl"
"github.com/GoogleContainerTools/skaffold/v2/pkg/skaffold/util"
)

type checkK8sRetryableErr func(error) bool

var (
// Slice of functions that check if a given k8s error is a retryable error or not.
retryableErrChecks []checkK8sRetryableErr = []checkK8sRetryableErr{
apierrs.IsServerTimeout,
apierrs.IsTimeout,
apierrs.IsTooManyRequests,
}
)

func ApplyOverrides(obj runtime.Object, overrides string) (runtime.Object, error) {
Expand Down Expand Up @@ -112,3 +132,57 @@ func GetGenericJob() *batchv1.Job {
},
}
}

func ForceJobDelete(ctx context.Context, jobName string, jobsManager typesbatchv1.JobInterface, kubectl *kubectl.CLI) error {
err := WithRetryablePoll(ctx, func(ctx context.Context) error {
_, err := jobsManager.Get(ctx, jobName, metav1.GetOptions{})
return err
})

if apierrs.IsNotFound(err) {
return nil
}

if err != nil {
return errors.Wrap(err, fmt.Sprintf("deleting %v job", jobName))
}
err = WithRetryablePoll(ctx, func(ctx context.Context) error {
return jobsManager.Delete(ctx, jobName, metav1.DeleteOptions{
GracePeriodSeconds: util.Ptr[int64](0),
PropagationPolicy: util.Ptr(metav1.DeletePropagationForeground),
})
})

if err != nil && !apierrs.IsNotFound(err) {
return err
}

err = deleteJobPod(ctx, jobName, kubectl)

return err
}

func deleteJobPod(ctx context.Context, jobName string, kubectl *kubectl.CLI) error {
// We execute the Pods delete with the kubectl CLI client to be able to force the deletion.
_, err := kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName))
return err
}

func WithRetryablePoll(ctx context.Context, execF func(context.Context) error) error {
return wait.PollImmediateWithContext(ctx, 100*time.Millisecond, 10*time.Second, func(ctx context.Context) (bool, error) {
err := execF(ctx)
if isRetryableErr(err) {
return false, nil
}

return true, err
})
}

func isRetryableErr(k8sErr error) bool {
isRetryable := false
for _, checkIsRetryableErr := range retryableErrChecks {
isRetryable = isRetryable || checkIsRetryableErr(k8sErr)
}
return isRetryable
}
40 changes: 3 additions & 37 deletions pkg/skaffold/verify/k8sjob/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"go.opentelemetry.io/otel/trace"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -251,7 +250,7 @@ func (v *Verifier) createAndRunJob(ctx context.Context, tc latest.VerifyTestCase
case <-v.timeout(timeoutDuration):
execErr = errors.New(fmt.Sprintf("%q running k8s job timed out after : %v", tc.Name, *timeoutDuration))
v.logger.CancelJobLogger(job.Name)
if err := v.forceJobDelete(ctx, clientset, job); err != nil {
if err := k8sjobutil.ForceJobDelete(ctx, job.Name, clientset.BatchV1().Jobs(job.Namespace), &v.kubectl); err != nil {
execErr = errors.Wrap(execErr, err.Error())
}
eventV2.VerifyFailed(tc.Name, execErr)
Expand Down Expand Up @@ -316,48 +315,15 @@ func (v *Verifier) Cleanup(ctx context.Context, out io.Writer, dryRun bool) erro
for _, job := range v.tracker.DeployedJobs() {
// assumes the job namespace is set and not "" which is the case as createJob
// & createJobFromManifestPath set the namespace in the created Job
if err := v.forceJobDelete(ctx, clientset, job); err != nil {
namespace := job.Namespace
if err := k8sjobutil.ForceJobDelete(ctx, job.Name, clientset.BatchV1().Jobs(namespace), &v.kubectl); err != nil {
// TODO(aaron-prindle): replace with actionable error
return errors.Wrap(err, "cleaning up deployed job")
}
}
return nil
}

func (v *Verifier) forceJobDelete(ctx context.Context, clientset k8sclient.Interface, job *batchv1.Job) error {
_, err := clientset.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{})

if apierrs.IsNotFound(err) {
return nil
}

if err != nil {
return errors.Wrap(err, fmt.Sprintf("deleting %v job", job.Name))
}

if err = v.deleteJobPod(ctx, job.Name); err != nil {
return err
}

if err = v.deleteJob(ctx, clientset, job.Name, job.Namespace); apierrs.IsNotFound(err) {
err = nil
}

return err
}

func (v *Verifier) deleteJob(ctx context.Context, clientset k8sclient.Interface, jobName, namespace string) error {
return clientset.BatchV1().Jobs(namespace).Delete(ctx, jobName, metav1.DeleteOptions{
PropagationPolicy: util.Ptr(metav1.DeletePropagationForeground),
})
}

func (v *Verifier) deleteJobPod(ctx context.Context, jobName string) error {
// We execute the Pods delete with the kubectl CLI client to be able to force the deletion.
_, err := v.kubectl.RunOut(ctx, "delete", "pod", "--force", "--grace-period", "0", "--wait=true", "--selector", fmt.Sprintf("job-name=%v", jobName))
return err
}

// Dependencies lists all the files that describe what needs to be verified.
func (v *Verifier) Dependencies() ([]string, error) {
return []string{}, nil
Expand Down

0 comments on commit c65794c

Please sign in to comment.