From 0626e818dcf8f2cceccfad8c77fad502550aec8a Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Fri, 6 Dec 2019 12:23:08 -0800 Subject: [PATCH] Adding workflow name to workflow crd labels (#39) * Adding workflow name to workflow crd labels * node id * test * cr feedback * lint * adding taskname label * bogus --- .../pkg/compiler/transformers/k8s/workflow.go | 13 ++++++++++- .../transformers/k8s/workflow_test.go | 1 + .../pkg/controller/nodes/executor_test.go | 1 + .../pkg/controller/nodes/node_exec_context.go | 22 ++++++++++++++++++- flytepropeller/pkg/utils/k8s.go | 17 ++++++++++++++ 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 9f15a04980..3bc3edddb4 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -9,11 +9,12 @@ import ( "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/compiler/common" "github.com/lyft/flytepropeller/pkg/compiler/errors" + "github.com/lyft/flytepropeller/pkg/utils" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ExecutionIDLabel = "execution-id" -const WorkflowIDLabel = "workflow-id" +const WorkflowNameLabel = "workflow-name" func requiresInputs(w *core.WorkflowTemplate) bool { if w == nil || w.GetInterface() == nil || w.GetInterface().GetInputs() == nil || @@ -25,6 +26,7 @@ func requiresInputs(w *core.WorkflowTemplate) bool { return len(w.GetInterface().GetInputs().Variables) > 0 } +// Note: Update WorkflowNameFromID for any change made to WorkflowIDAsString func WorkflowIDAsString(id *core.Identifier) string { b := strings.Builder{} _, err := b.WriteString(id.Project) @@ -55,6 +57,14 @@ func WorkflowIDAsString(id *core.Identifier) string { return b.String() } +func WorkflowNameFromID(id string) string { + tokens := strings.Split(id, ":") + if len(tokens) != 3 { + return "" + } + return tokens[2] +} + func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) ( spec *v1alpha1.WorkflowSpec) { var failureN *v1alpha1.NodeSpec @@ -171,6 +181,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) } + obj.ObjectMeta.Labels[WorkflowNameLabel] = utils.SanitizeLabelValue(WorkflowNameFromID(primarySpec.ID)) if obj.Nodes == nil || obj.Connections.DownstreamEdges == nil { // If we come here, we'd better have an error generated earlier. Otherwise, add one to make sure build fails. diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go index 098dab245a..149e674048 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go @@ -106,6 +106,7 @@ func TestBuildFlyteWorkflow(t *testing.T) { }, }, nil, nil, "") + assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel]) assert.NoError(t, err) assert.NotNil(t, wf) errors.SetConfig(errors.Config{}) diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 048a98c8d7..f85eca845a 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -547,6 +547,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { mockWf.On("GetExecutionStatus").Return(mockWfStatus) mockWf.On("GetTask", taskID0).Return(tk, nil) mockWf.On("GetTask", taskID).Return(tk, nil) + mockWf.On("GetLabels").Return(make(map[string]string)) mockWfStatus.On("GetDataDir").Return(storage.DataReference("x")) return mockWf, mockN2Status } diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index aa8d29105b..6908321894 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -12,8 +12,12 @@ import ( "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" + "github.com/lyft/flytepropeller/pkg/utils" ) +const NodeIDLabel = "node-id" +const TaskNameLabel = "task-name" + type execMetadata struct { v1alpha1.WorkflowMeta } @@ -38,6 +42,7 @@ type execContext struct { nsm *nodeStateManager enqueueOwner func() error w v1alpha1.ExecutableWorkflow + nodeLabels map[string]string } func (e execContext) EnqueueOwnerFunc() func() error { @@ -96,9 +101,23 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } +func (e execContext) GetLabels() map[string]string { + return e.nodeLabels +} + func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error) *execContext { + md := execMetadata{WorkflowMeta: w} + nodeLabels := md.GetLabels() + if nodeLabels == nil { + nodeLabels = make(map[string]string) + } + nodeLabels[NodeIDLabel] = utils.SanitizeLabelValue(node.GetID()) + if tr != nil && tr.GetTaskID() != nil { + nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) + } + return &execContext{ - md: execMetadata{WorkflowMeta: w}, + md: md, store: store, node: node, nodeStatus: nodeStatus, @@ -109,6 +128,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1. nsm: nsm, enqueueOwner: enqueueOwner, w: w, + nodeLabels: nodeLabels, } } diff --git a/flytepropeller/pkg/utils/k8s.go b/flytepropeller/pkg/utils/k8s.go index 5d45ac8689..b0bf36122b 100644 --- a/flytepropeller/pkg/utils/k8s.go +++ b/flytepropeller/pkg/utils/k8s.go @@ -1,6 +1,9 @@ package utils import ( + "regexp" + "strings" + "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -9,6 +12,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/validation" ) var NotTheOwnerError = errors.Errorf("FlytePropeller is not the owner") @@ -16,6 +20,8 @@ var NotTheOwnerError = errors.Errorf("FlytePropeller is not the owner") // ResourceNvidiaGPU is the name of the Nvidia GPU resource. const ResourceNvidiaGPU = "nvidia.com/gpu" +var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+") + func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar { envVars := make([]v1.EnvVar, 0, len(env)) for _, kv := range env { @@ -94,6 +100,7 @@ func GetWorkflowIDFromOwner(reference *metav1.OwnerReference, namespace string) } return "", NotTheOwnerError } + func GetProtoTime(t *metav1.Time) *timestamp.Timestamp { if t != nil { pTime, err := ptypes.TimestampProto(t.Time) @@ -103,3 +110,13 @@ func GetProtoTime(t *metav1.Time) *timestamp.Timestamp { } return ptypes.TimestampNow() } + +// SanitizeLabelValue ensures that the label value is a valid DNS-1123 string +func SanitizeLabelValue(name string) string { + name = strings.ToLower(name) + name = invalidDNS1123Characters.ReplaceAllString(name, "-") + if len(name) > validation.DNS1123LabelMaxLength { + name = name[0:validation.DNS1123LabelMaxLength] + } + return strings.Trim(name, "-") +}