Skip to content

Commit

Permalink
feat: Add Argo variable {{retries}} to track retry attempt (#2911)
Browse files Browse the repository at this point in the history
  • Loading branch information
seddonm1 authored May 8, 2020
1 parent 14b7a45 commit d8cb66e
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The following variables are made available to reference various metadata of a wo
| Variable | Description|
|----------|------------|
| `pod.name` | Pod name of the container/script |
| `retries` | The retry number of the container/script if retryStrategy is specified |
| `inputs.artifacts.<NAME>.path` | Local path of the input artifact |
| `outputs.artifacts.<NAME>.path` | Local path of the output artifact |
| `outputs.parameters.<NAME>.path` | Local path of the output parameter |
Expand Down
2 changes: 1 addition & 1 deletion examples/retry-with-steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ spec:
image: python:alpine3.6
command: [python, -c]
# fail with a 66% probability
args: ["import random; import sys; exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
args: ["import random; import sys; print('retries: {{retries}}'); exit_code = random.choice([0, 1, 1]); sys.exit(exit_code)"]
2 changes: 2 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ const (
GlobalVarWorkflowParameters = "workflow.parameters"
// LocalVarPodName is a step level variable that references the name of the pod
LocalVarPodName = "pod.name"
// LocalVarRetries is a step level variable that references the retries number if retryStrategy is specified
LocalVarRetries = "retries"

KubeConfigDefaultMountPath = "/kube/config"
KubeConfigDefaultVolumeName = "kubeconfig"
Expand Down
14 changes: 10 additions & 4 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
if resolvedTmpl.IsPodType() && resolvedTmpl.RetryStrategy == nil {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
}

// Inputs has been processed with arguments already, so pass empty arguments.
processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false)
if err != nil {
Expand Down Expand Up @@ -1470,12 +1471,17 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
woc.addChildNode(retryNodeName, nodeName)
node = nil

localParams := make(map[string]string)
// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, map[string]string{common.LocalVarPodName: woc.wf.NodeID(nodeName)})
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, opts.boundaryID, err), err
}
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
}
// Inject the retryAttempt number
localParams[common.LocalVarRetries] = strconv.Itoa(len(retryParentNode.Children))

processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, opts.boundaryID, err), err
}
}
}
Expand Down
107 changes: 106 additions & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,111 @@ func TestBackoffMessage(t *testing.T) {
assert.Equal(t, "", newRetryNode.Message)
}

var retriesVariableTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: whalesay
spec:
entrypoint: whalesay
templates:
- name: whalesay
retryStrategy:
limit: 10
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay {{retries}}"]
`

func TestRetriesVariable(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := unmarshalWF(retriesVariableTemplate)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)

iterations := 5
for i := 1; i <= iterations; i++ {
if i != 1 {
makePodsPhase(t, apiv1.PodFailed, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
}
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
}

pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, iterations, len(pods.Items))
for i := 0; i < iterations; i++ {
assert.Equal(t, fmt.Sprintf("cowsay %d", i), pods.Items[i].Spec.Containers[1].Args[0])
}
}

var stepsRetriesVariableTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: whalesay
spec:
entrypoint: step-retry
templates:
- name: step-retry
retryStrategy:
limit: 10
steps:
- - name: whalesay-success
arguments:
parameters:
- name: retries
value: "{{retries}}"
template: whalesay
- name: whalesay
inputs:
parameters:
- name: retries
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay {{inputs.parameters.retries}}"]
`

func TestStepsRetriesVariable(t *testing.T) {
cancel, controller := newController()
defer cancel()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := unmarshalWF(stepsRetriesVariableTemplate)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)

iterations := 5
for i := 1; i <= iterations; i++ {
if i != 1 {
makePodsPhase(t, apiv1.PodFailed, controller.kubeclientset, wf.ObjectMeta.Namespace)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
}
// move to next retry step
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
}

pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, iterations, len(pods.Items))
for i := 0; i < iterations; i++ {
assert.Equal(t, fmt.Sprintf("cowsay %d", i), pods.Items[i].Spec.Containers[1].Args[0])
}
}

func TestAssessNodeStatus(t *testing.T) {
daemoned := true
tests := []struct {
Expand Down Expand Up @@ -2686,7 +2791,7 @@ metadata:
name: artifact-repo-config-ref
spec:
entrypoint: whalesay
poddisruptionbudget:
poddisruptionbudget:
minavailable: 100%
templates:
- name: whalesay
Expand Down
4 changes: 4 additions & 0 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ func (ctx *templateValidationCtx) validateTemplate(tmpl *wfv1.Template, tmplCtx
localParams[common.LocalVarPodName] = placeholderGenerator.NextPlaceholder()
scope[common.LocalVarPodName] = placeholderGenerator.NextPlaceholder()
}
if tmpl.RetryStrategy != nil {
localParams[common.LocalVarRetries] = placeholderGenerator.NextPlaceholder()
scope[common.LocalVarRetries] = placeholderGenerator.NextPlaceholder()
}
if tmpl.IsLeaf() {
for _, art := range tmpl.Outputs.Artifacts {
if art.Path != "" {
Expand Down

0 comments on commit d8cb66e

Please sign in to comment.