From c43095311eac11433ae26cf516627b313ce2b892 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 20:13:40 -0700 Subject: [PATCH 1/8] FlytePropeller uses DataOutputSandboxes --- go.mod | 2 +- go.sum | 2 + pkg/controller/config/config.go | 39 ++++++++++--------- pkg/controller/config/config_flags.go | 1 + pkg/controller/config/config_flags_test.go | 22 +++++++++++ pkg/controller/controller.go | 6 ++- pkg/controller/nodes/executor.go | 8 +++- .../handler/mocks/node_execution_context.go | 32 +++++++++++++++ .../nodes/handler/node_exec_context.go | 8 +++- pkg/controller/nodes/node_exec_context.go | 16 ++++++-- pkg/controller/nodes/task/handler.go | 1 + pkg/controller/nodes/task/taskexec_context.go | 7 +++- 12 files changed, 115 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 34b42ee69..e7864adbf 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.11 + github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2 github.com/lyft/flytestdlib v0.3.2 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index 531ebb2ed..09e5bd7b9 100644 --- a/go.sum +++ b/go.sum @@ -389,6 +389,8 @@ github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+ github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2 h1:QYE15eK48RcM6RP1X+uMEXWKACagk0gFDp8BNrPuKQY= +github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index be5aa72c3..ef65de29d 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -53,25 +53,26 @@ var ( // Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is // the base configuration to start propeller type Config struct { - KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` - MasterURL string `json:"master"` - Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` - WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` - DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` - LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` - ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` - MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` - Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` - MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` - EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` - MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` - MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` - GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` - LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` - PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` - MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` - KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` - NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` + KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` + MasterURL string `json:"master"` + Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` + WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` + DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` + LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` + ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` + MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` + DefaultDataSandboxPrefix string `json:"default-datasandbox" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."` + Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` + MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` + EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` + MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` + MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` + GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` + LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` + PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` + MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` + KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` + NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` } type KubeClientConfig struct { diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index ca64ef171..6060be406 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -49,6 +49,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limit-namespace"), defaultConfig.LimitNamespace, "Namespaces to watch for this propeller") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), defaultConfig.ProfilerPort.String(), "Profiler port") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metadata-prefix"), defaultConfig.MetadataPrefix, "MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-datasandbox"), defaultConfig.DefaultDataSandboxPrefix, "a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.type"), defaultConfig.Queue.Type, "Type of composite queue to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.type"), defaultConfig.Queue.Queue.Type, "Type of RateLimiter to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.base-delay"), defaultConfig.Queue.Queue.BaseDelay.String(), "base backoff delay for failure") diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index 7c1081b36..f5ade89b7 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -275,6 +275,28 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_default-datasandbox", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("default-datasandbox"); err == nil { + assert.Equal(t, string(defaultConfig.DefaultDataSandboxPrefix), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("default-datasandbox", testValue) + if vString, err := cmdFlags.GetString("default-datasandbox"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.DefaultDataSandboxPrefix) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_queue.type", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5a0d6a82b..da2a53968 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -3,9 +3,10 @@ package controller import ( "context" - errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" stdErrs "github.com/lyft/flytestdlib/errors" + errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors" + "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog" @@ -297,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store") } - nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope) + nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, + storage.DataReference(cfg.DefaultDataSandboxPrefix), kubeClient, catalogClient, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") } diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index fca1f1a56..67c0017e4 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -12,7 +12,6 @@ import ( eventsErr "github.com/lyft/flyteidl/clients/go/events/errors" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/promutils" @@ -20,6 +19,8 @@ import ( "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" + "github.com/lyft/flytepropeller/pkg/controller/config" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,6 +61,7 @@ type nodeExecutor struct { defaultExecutionDeadline time.Duration defaultActiveDeadline time.Duration maxNodeRetriesForSystemFailures uint32 + defaultDataSandbox storage.DataReference } func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) { @@ -710,7 +712,8 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { return c.nodeHandlerFactory.Setup(ctx, s) } -func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, maxDatasetSize int64, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { +func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, + workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultDataSandboxPath storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { nodeScope := scope.NewSubScope("node") exec := &nodeExecutor{ @@ -735,6 +738,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesForSystemFailures), + defaultDataSandbox: defaultDataSandboxPath, } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory diff --git a/pkg/controller/nodes/handler/mocks/node_execution_context.go b/pkg/controller/nodes/handler/mocks/node_execution_context.go index db5219d40..9f5cb26fa 100644 --- a/pkg/controller/nodes/handler/mocks/node_execution_context.go +++ b/pkg/controller/nodes/handler/mocks/node_execution_context.go @@ -421,6 +421,38 @@ func (_m *NodeExecutionContext) NodeStatus() v1alpha1.ExecutableNodeStatus { return r0 } +type NodeExecutionContext_OutputDataSandboxBasePath struct { + *mock.Call +} + +func (_m NodeExecutionContext_OutputDataSandboxBasePath) Return(_a0 storage.DataReference) *NodeExecutionContext_OutputDataSandboxBasePath { + return &NodeExecutionContext_OutputDataSandboxBasePath{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeExecutionContext) OnOutputDataSandboxBasePath() *NodeExecutionContext_OutputDataSandboxBasePath { + c := _m.On("OutputDataSandboxBasePath") + return &NodeExecutionContext_OutputDataSandboxBasePath{Call: c} +} + +func (_m *NodeExecutionContext) OnOutputDataSandboxBasePathMatch(matchers ...interface{}) *NodeExecutionContext_OutputDataSandboxBasePath { + c := _m.On("OutputDataSandboxBasePath", matchers...) + return &NodeExecutionContext_OutputDataSandboxBasePath{Call: c} +} + +// OutputDataSandboxBasePath provides a mock function with given fields: +func (_m *NodeExecutionContext) OutputDataSandboxBasePath() storage.DataReference { + ret := _m.Called() + + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(storage.DataReference) + } + + return r0 +} + type NodeExecutionContext_TaskReader struct { *mock.Call } diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 41a6aed11..3bf9cacb8 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -5,12 +5,13 @@ import ( "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) @@ -38,6 +39,11 @@ type NodeExecutionMetadata interface { } type NodeExecutionContext interface { + // This path is never read by propeller, but allows using some container or prefix in a specific container for all output from tasks + // Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is + // available to the task at High Bandwidth (for example the base path of a sharded s3 bucket. + // This with a prefix based sharded strategy, could improve the throughput from S3 manifold) + OutputDataSandboxBasePath() storage.DataReference DataStore() *storage.DataStore InputReader() io.InputReader EventsRecorder() events.TaskEventRecorder diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 7a5294630..a8e91bb60 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -5,11 +5,12 @@ import ( "fmt" "github.com/lyft/flyteidl/clients/go/events" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" "github.com/lyft/flytepropeller/pkg/utils" @@ -47,6 +48,11 @@ type execContext struct { nsm *nodeStateManager enqueueOwner func() error w v1alpha1.ExecutableWorkflow + outputDataSandbox storage.DataReference +} + +func (e execContext) OutputDataSandboxBasePath() storage.DataReference { + return e.outputDataSandbox } func (e execContext) EnqueueOwnerFunc() func() error { @@ -105,7 +111,7 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error) *execContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, outputSandbox storage.DataReference) *execContext { md := execMetadata{WorkflowMeta: w} // Copying the labels before updating it for this node @@ -131,6 +137,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1. nsm: nsm, enqueueOwner: enqueueOwner, w: w, + outputDataSandbox: outputSandbox, } } @@ -170,5 +177,8 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, w v1alpha1 tr, newNodeStateManager(ctx, s), workflowEnqueuer, + // Eventually we want to replace this with per workflow sandboxes + // https://github.com/lyft/flyte/issues/211 + c.defaultDataSandbox, ), nil } diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 97a1a476d..2b8a5c430 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -140,6 +140,7 @@ type Handler struct { secretManager pluginCore.SecretManager resourceManager resourcemanager.BaseResourceManager barrierCache *barrier + sharder ioutils.ShardSelector cfg *config.Config } diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index cee86ad56..3365dfbae 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -10,6 +10,7 @@ import ( "github.com/lyft/flytestdlib/logger" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + pluginCatalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -123,7 +124,11 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, err } - ow := ioutils.NewBufferedOutputWriter(ctx, ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())) + outputSandbox, err := ioutils.NewRandomPrefixShardedOutputSandbox(ctx, t.sharder, nCtx.OutputDataSandboxBasePath(), nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore()) + if err != nil { + return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "failed to create output sandbox for node execution") + } + ow := ioutils.NewBufferedOutputWriter(ctx, ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), outputSandbox)) ts := nCtx.NodeStateReader().GetTaskNodeState() var b *bytes.Buffer if ts.PluginState != nil { From 529e1afbcbfdf2216f7afa3af12b6e604a72150e Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 21:07:19 -0700 Subject: [PATCH 2/8] updated go.mod --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e7864adbf..4a1bf01d3 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2 + github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c github.com/lyft/flytestdlib v0.3.2 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index 09e5bd7b9..58feeda0f 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8V github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2 h1:QYE15eK48RcM6RP1X+uMEXWKACagk0gFDp8BNrPuKQY= github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c h1:HSXs5qQNlElLooutHllRnX6Cy5cWd0Ew0mMcMHrAAN0= +github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= From f1f4b6d3b437171241c117d9fd6b8faa6370be92 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 16 Mar 2020 23:42:56 -0700 Subject: [PATCH 3/8] Working unit tests --- go.mod | 2 +- go.sum | 2 ++ pkg/controller/nodes/dynamic/handler.go | 2 +- pkg/controller/nodes/executor.go | 9 +++++ pkg/controller/nodes/executor_test.go | 26 +++++++------- .../handler/mocks/node_execution_context.go | 36 +++++++++++++++++++ .../nodes/handler/node_exec_context.go | 5 +++ pkg/controller/nodes/node_exec_context.go | 9 ++++- .../nodes/node_exec_context_test.go | 3 +- pkg/controller/nodes/task/handler.go | 1 - pkg/controller/nodes/task/handler_test.go | 17 +++++++++ pkg/controller/nodes/task/taskexec_context.go | 2 +- .../nodes/task/taskexec_context_test.go | 3 ++ pkg/controller/workflow/executor_test.go | 12 +++---- 14 files changed, 104 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 4a1bf01d3..8bf1104fe 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c + github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e github.com/lyft/flytestdlib v0.3.2 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index 58feeda0f..68c9af662 100644 --- a/go.sum +++ b/go.sum @@ -393,6 +393,8 @@ github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2 h1:QYE15eK48R github.com/lyft/flyteplugins v0.3.12-0.20200317002119-cce3a0006bd2/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c h1:HSXs5qQNlElLooutHllRnX6Cy5cWd0Ew0mMcMHrAAN0= github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e h1:XBXcwqJOkQYFQKuV4cVj67nT80nvsDgHn8uHbnbVuic= +github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 9f3cfa666..c3cf74de1 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -106,7 +106,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n if trns.Info().GetPhase() == handler.EPhaseSuccess { logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID()) - outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir()) + outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil) execID := task.GetTaskExecutionIdentifier(nCtx) outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{ diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 67c0017e4..1535163bb 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" errors2 "github.com/lyft/flytestdlib/errors" "github.com/golang/protobuf/ptypes" @@ -62,6 +63,7 @@ type nodeExecutor struct { defaultActiveDeadline time.Duration maxNodeRetriesForSystemFailures uint32 defaultDataSandbox storage.DataReference + shardSelector ioutils.ShardSelector } func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) { @@ -715,6 +717,12 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultDataSandboxPath storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { + // TODO we may want to make this configurable. + shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx) + if err != nil { + return nil, err + } + nodeScope := scope.NewSubScope("node") exec := &nodeExecutor{ store: store, @@ -739,6 +747,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesForSystemFailures), defaultDataSandbox: defaultDataSandboxPath, + shardSelector: shardSelector, } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index bcef2bd90..d44062456 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -45,7 +45,7 @@ func TestSetInputsForStartNode(t *testing.T) { mockStorage := createInmemoryDataStore(t, testScope.NewSubScope("f")) enQWf := func(workflowID v1alpha1.WorkflowID) {} - exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket/", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) inputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -91,7 +91,7 @@ func TestSetInputsForStartNode(t *testing.T) { }) failStorage := createFailingDatastore(t, testScope.NewSubScope("failing")) - execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("StorageFailure", func(t *testing.T) { w := createDummyBaseWorkflow(mockStorage) @@ -115,7 +115,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { assert.NoError(t, err) t.Run("happy", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -128,7 +128,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { }) t.Run("error", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -149,7 +149,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -247,7 +247,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -597,7 +597,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -669,7 +669,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -767,7 +767,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run(test.name, func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -817,7 +817,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-exhausted", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -844,7 +844,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-remaining", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -876,7 +876,7 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -984,7 +984,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) diff --git a/pkg/controller/nodes/handler/mocks/node_execution_context.go b/pkg/controller/nodes/handler/mocks/node_execution_context.go index 9f5cb26fa..890a5a764 100644 --- a/pkg/controller/nodes/handler/mocks/node_execution_context.go +++ b/pkg/controller/nodes/handler/mocks/node_execution_context.go @@ -7,6 +7,8 @@ import ( io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" handler "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" + ioutils "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + mock "github.com/stretchr/testify/mock" storage "github.com/lyft/flytestdlib/storage" @@ -453,6 +455,40 @@ func (_m *NodeExecutionContext) OutputDataSandboxBasePath() storage.DataReferenc return r0 } +type NodeExecutionContext_OutputShardSelector struct { + *mock.Call +} + +func (_m NodeExecutionContext_OutputShardSelector) Return(_a0 ioutils.ShardSelector) *NodeExecutionContext_OutputShardSelector { + return &NodeExecutionContext_OutputShardSelector{Call: _m.Call.Return(_a0)} +} + +func (_m *NodeExecutionContext) OnOutputShardSelector() *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector") + return &NodeExecutionContext_OutputShardSelector{Call: c} +} + +func (_m *NodeExecutionContext) OnOutputShardSelectorMatch(matchers ...interface{}) *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector", matchers...) + return &NodeExecutionContext_OutputShardSelector{Call: c} +} + +// OutputShardSelector provides a mock function with given fields: +func (_m *NodeExecutionContext) OutputShardSelector() ioutils.ShardSelector { + ret := _m.Called() + + var r0 ioutils.ShardSelector + if rf, ok := ret.Get(0).(func() ioutils.ShardSelector); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ioutils.ShardSelector) + } + } + + return r0 +} + type NodeExecutionContext_TaskReader struct { *mock.Call } diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 3bf9cacb8..5077bb068 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -5,6 +5,7 @@ import ( "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -44,6 +45,10 @@ type NodeExecutionContext interface { // available to the task at High Bandwidth (for example the base path of a sharded s3 bucket. // This with a prefix based sharded strategy, could improve the throughput from S3 manifold) OutputDataSandboxBasePath() storage.DataReference + + // Sharding strategy for the output data for this node execution. + OutputShardSelector() ioutils.ShardSelector + DataStore() *storage.DataStore InputReader() io.InputReader EventsRecorder() events.TaskEventRecorder diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index a8e91bb60..4e3353fa3 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -49,6 +49,11 @@ type execContext struct { enqueueOwner func() error w v1alpha1.ExecutableWorkflow outputDataSandbox storage.DataReference + shardSelector ioutils.ShardSelector +} + +func (e execContext) OutputShardSelector() ioutils.ShardSelector { + return e.shardSelector } func (e execContext) OutputDataSandboxBasePath() storage.DataReference { @@ -111,7 +116,7 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, outputSandbox storage.DataReference) *execContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, outputSandbox storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { md := execMetadata{WorkflowMeta: w} // Copying the labels before updating it for this node @@ -138,6 +143,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1. enqueueOwner: enqueueOwner, w: w, outputDataSandbox: outputSandbox, + shardSelector: outputShardSelector, } } @@ -180,5 +186,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, w v1alpha1 // Eventually we want to replace this with per workflow sandboxes // https://github.com/lyft/flyte/issues/211 c.defaultDataSandbox, + c.shardSelector, ), nil } diff --git a/pkg/controller/nodes/node_exec_context_test.go b/pkg/controller/nodes/node_exec_context_test.go index bf250c4e0..37a6efcc1 100644 --- a/pkg/controller/nodes/node_exec_context_test.go +++ b/pkg/controller/nodes/node_exec_context_test.go @@ -7,6 +7,7 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -45,7 +46,7 @@ func Test_NodeContext(t *testing.T) { Kind: v1alpha1.NodeKindTask, } s, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) - nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, 0, nil, TaskReader{}, nil, nil) + nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, 0, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) assert.Equal(t, nCtx.NodeExecutionMetadata().GetLabels()["node-id"], "id") assert.Equal(t, nCtx.NodeExecutionMetadata().GetLabels()["task-name"], "task-name") } diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 2b8a5c430..97a1a476d 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -140,7 +140,6 @@ type Handler struct { secretManager pluginCore.SecretManager resourceManager resourcemanager.BaseResourceManager barrierCache *barrier - sharder ioutils.ShardSelector cfg *config.Config } diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 06e2a3221..34cb77d6d 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/resourcemanager" "github.com/lyft/flytestdlib/contextutils" @@ -381,6 +383,9 @@ func Test_task_Handle_NoCatalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(pluginResp, st)) @@ -682,6 +687,9 @@ func Test_task_Handle_Catalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(&fakeplugins.NextPhaseState{ @@ -884,6 +892,9 @@ func Test_task_Handle_Barrier(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} assert.NoError(t, cod.Encode(&fakeplugins.NextPhaseState{ @@ -1137,6 +1148,9 @@ func Test_task_Abort(t *testing.T) { nCtx.On("EnqueueOwner").Return(nil) nCtx.On("EventsRecorder").Return(ev) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + st := bytes.NewBuffer([]byte{}) a := 45 type test struct { @@ -1258,6 +1272,9 @@ func Test_task_Finalize(t *testing.T) { nCtx.On("EventsRecorder").Return(nil) nCtx.On("EnqueueOwner").Return(nil) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) st := bytes.NewBuffer([]byte{}) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index 3365dfbae..b7415a939 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -124,7 +124,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, err } - outputSandbox, err := ioutils.NewRandomPrefixShardedOutputSandbox(ctx, t.sharder, nCtx.OutputDataSandboxBasePath(), nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore()) + outputSandbox, err := ioutils.NewShardedOutputSandbox(ctx, nCtx.OutputShardSelector(), nCtx.OutputDataSandboxBasePath(), uniqueID, nCtx.DataStore()) if err != nil { return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "failed to create output sandbox for node execution") } diff --git a/pkg/controller/nodes/task/taskexec_context_test.go b/pkg/controller/nodes/task/taskexec_context_test.go index ed0c4bae7..bc9084f64 100644 --- a/pkg/controller/nodes/task/taskexec_context_test.go +++ b/pkg/controller/nodes/task/taskexec_context_test.go @@ -8,6 +8,7 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -82,6 +83,8 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { PluginState: st.Bytes(), }) nCtx.On("NodeStateReader").Return(nr) + nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index c7a968f0b..90bdff4a1 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -226,7 +226,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -301,7 +301,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -359,7 +359,7 @@ func BenchmarkWorkflowExecutor(b *testing.B) { eventSink := events.NewMockEventSink() catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(b, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, scope) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, scope) assert.NoError(b, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -444,7 +444,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -532,7 +532,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -584,7 +584,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, fakeKubeClient, catalogClient, promutils.NewTestScope()) + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) { From 0891958ac79dd76f0449946e0bc99a70357d7dc5 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 17 Mar 2020 20:49:11 -0700 Subject: [PATCH 4/8] updated name from sandbox -> rawoutputpath --- go.mod | 2 +- go.sum | 2 + pkg/controller/config/config.go | 38 ++++++------ pkg/controller/config/config_flags.go | 2 +- pkg/controller/config/config_flags_test.go | 12 ++-- pkg/controller/controller.go | 2 +- pkg/controller/nodes/executor.go | 4 +- .../handler/mocks/node_execution_context.go | 60 +++++++++---------- .../nodes/handler/node_exec_context.go | 2 +- pkg/controller/nodes/node_exec_context.go | 10 ++-- pkg/controller/nodes/task/handler_test.go | 10 ++-- pkg/controller/nodes/task/taskexec_context.go | 2 +- .../nodes/task/taskexec_context_test.go | 2 +- 13 files changed, 75 insertions(+), 73 deletions(-) diff --git a/go.mod b/go.mod index 8bf1104fe..6c0c0ee87 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e + github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8 github.com/lyft/flytestdlib v0.3.2 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index 68c9af662..3f3210c55 100644 --- a/go.sum +++ b/go.sum @@ -395,6 +395,8 @@ github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c h1:HSXs5qQNlE github.com/lyft/flyteplugins v0.3.12-0.20200317040629-c9c9d9ace20c/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e h1:XBXcwqJOkQYFQKuV4cVj67nT80nvsDgHn8uHbnbVuic= github.com/lyft/flyteplugins v0.3.12-0.20200317045100-833b62529b0e/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8 h1:B9c+6aKzqA/3KDn42nSLqBloIApxrTv14DM9T4BWMXo= +github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index ef65de29d..46096d41e 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -53,25 +53,25 @@ var ( // Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is // the base configuration to start propeller type Config struct { - KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` - MasterURL string `json:"master"` - Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` - WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` - DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` - LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` - ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` - MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` - DefaultDataSandboxPrefix string `json:"default-datasandbox" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."` - Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` - MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` - EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` - MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` - MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` - GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` - LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` - PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` - MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` - KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` + KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."` + MasterURL string `json:"master"` + Workers int `json:"workers" pflag:"2,Number of threads to process workflows"` + WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"` + DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"` + LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"` + ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` + MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."` + DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."` + Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."` + MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."` + EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"` + MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"` + MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"` + GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"` + LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."` + PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` + MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` + KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` } diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index 6060be406..1802aeb78 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -49,7 +49,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limit-namespace"), defaultConfig.LimitNamespace, "Namespaces to watch for this propeller") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), defaultConfig.ProfilerPort.String(), "Profiler port") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metadata-prefix"), defaultConfig.MetadataPrefix, "MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly.") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-datasandbox"), defaultConfig.DefaultDataSandboxPrefix, "a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "rawoutput-prefix"), defaultConfig.DefaultRawOutputPrefix, "a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.type"), defaultConfig.Queue.Type, "Type of composite queue to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.type"), defaultConfig.Queue.Queue.Type, "Type of RateLimiter to use for the WorkQueue") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "queue.queue.base-delay"), defaultConfig.Queue.Queue.BaseDelay.String(), "base backoff delay for failure") diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index f5ade89b7..839fc56ca 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -275,11 +275,11 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_default-datasandbox", func(t *testing.T) { + t.Run("Test_rawoutput-prefix", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("default-datasandbox"); err == nil { - assert.Equal(t, string(defaultConfig.DefaultDataSandboxPrefix), vString) + if vString, err := cmdFlags.GetString("rawoutput-prefix"); err == nil { + assert.Equal(t, string(defaultConfig.DefaultRawOutputPrefix), vString) } else { assert.FailNow(t, err.Error()) } @@ -288,9 +288,9 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("default-datasandbox", testValue) - if vString, err := cmdFlags.GetString("default-datasandbox"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.DefaultDataSandboxPrefix) + cmdFlags.Set("rawoutput-prefix", testValue) + if vString, err := cmdFlags.GetString("rawoutput-prefix"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.DefaultRawOutputPrefix) } else { assert.FailNow(t, err.Error()) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index da2a53968..093157be6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -299,7 +299,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter } nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, - storage.DataReference(cfg.DefaultDataSandboxPrefix), kubeClient, catalogClient, scope) + storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") } diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 1535163bb..b0e1af1d9 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -715,7 +715,7 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { } func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, - workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultDataSandboxPath storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { + workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { // TODO we may want to make this configurable. shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx) @@ -746,7 +746,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration, defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesForSystemFailures), - defaultDataSandbox: defaultDataSandboxPath, + defaultDataSandbox: defaultRawOutputPrefix, shardSelector: shardSelector, } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) diff --git a/pkg/controller/nodes/handler/mocks/node_execution_context.go b/pkg/controller/nodes/handler/mocks/node_execution_context.go index 890a5a764..59456d50e 100644 --- a/pkg/controller/nodes/handler/mocks/node_execution_context.go +++ b/pkg/controller/nodes/handler/mocks/node_execution_context.go @@ -423,67 +423,67 @@ func (_m *NodeExecutionContext) NodeStatus() v1alpha1.ExecutableNodeStatus { return r0 } -type NodeExecutionContext_OutputDataSandboxBasePath struct { +type NodeExecutionContext_OutputShardSelector struct { *mock.Call } -func (_m NodeExecutionContext_OutputDataSandboxBasePath) Return(_a0 storage.DataReference) *NodeExecutionContext_OutputDataSandboxBasePath { - return &NodeExecutionContext_OutputDataSandboxBasePath{Call: _m.Call.Return(_a0)} +func (_m NodeExecutionContext_OutputShardSelector) Return(_a0 ioutils.ShardSelector) *NodeExecutionContext_OutputShardSelector { + return &NodeExecutionContext_OutputShardSelector{Call: _m.Call.Return(_a0)} } -func (_m *NodeExecutionContext) OnOutputDataSandboxBasePath() *NodeExecutionContext_OutputDataSandboxBasePath { - c := _m.On("OutputDataSandboxBasePath") - return &NodeExecutionContext_OutputDataSandboxBasePath{Call: c} +func (_m *NodeExecutionContext) OnOutputShardSelector() *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector") + return &NodeExecutionContext_OutputShardSelector{Call: c} } -func (_m *NodeExecutionContext) OnOutputDataSandboxBasePathMatch(matchers ...interface{}) *NodeExecutionContext_OutputDataSandboxBasePath { - c := _m.On("OutputDataSandboxBasePath", matchers...) - return &NodeExecutionContext_OutputDataSandboxBasePath{Call: c} +func (_m *NodeExecutionContext) OnOutputShardSelectorMatch(matchers ...interface{}) *NodeExecutionContext_OutputShardSelector { + c := _m.On("OutputShardSelector", matchers...) + return &NodeExecutionContext_OutputShardSelector{Call: c} } -// OutputDataSandboxBasePath provides a mock function with given fields: -func (_m *NodeExecutionContext) OutputDataSandboxBasePath() storage.DataReference { +// OutputShardSelector provides a mock function with given fields: +func (_m *NodeExecutionContext) OutputShardSelector() ioutils.ShardSelector { ret := _m.Called() - var r0 storage.DataReference - if rf, ok := ret.Get(0).(func() storage.DataReference); ok { + var r0 ioutils.ShardSelector + if rf, ok := ret.Get(0).(func() ioutils.ShardSelector); ok { r0 = rf() } else { - r0 = ret.Get(0).(storage.DataReference) + if ret.Get(0) != nil { + r0 = ret.Get(0).(ioutils.ShardSelector) + } } return r0 } -type NodeExecutionContext_OutputShardSelector struct { +type NodeExecutionContext_RawOutputPrefix struct { *mock.Call } -func (_m NodeExecutionContext_OutputShardSelector) Return(_a0 ioutils.ShardSelector) *NodeExecutionContext_OutputShardSelector { - return &NodeExecutionContext_OutputShardSelector{Call: _m.Call.Return(_a0)} +func (_m NodeExecutionContext_RawOutputPrefix) Return(_a0 storage.DataReference) *NodeExecutionContext_RawOutputPrefix { + return &NodeExecutionContext_RawOutputPrefix{Call: _m.Call.Return(_a0)} } -func (_m *NodeExecutionContext) OnOutputShardSelector() *NodeExecutionContext_OutputShardSelector { - c := _m.On("OutputShardSelector") - return &NodeExecutionContext_OutputShardSelector{Call: c} +func (_m *NodeExecutionContext) OnRawOutputPrefix() *NodeExecutionContext_RawOutputPrefix { + c := _m.On("RawOutputPrefix") + return &NodeExecutionContext_RawOutputPrefix{Call: c} } -func (_m *NodeExecutionContext) OnOutputShardSelectorMatch(matchers ...interface{}) *NodeExecutionContext_OutputShardSelector { - c := _m.On("OutputShardSelector", matchers...) - return &NodeExecutionContext_OutputShardSelector{Call: c} +func (_m *NodeExecutionContext) OnRawOutputPrefixMatch(matchers ...interface{}) *NodeExecutionContext_RawOutputPrefix { + c := _m.On("RawOutputPrefix", matchers...) + return &NodeExecutionContext_RawOutputPrefix{Call: c} } -// OutputShardSelector provides a mock function with given fields: -func (_m *NodeExecutionContext) OutputShardSelector() ioutils.ShardSelector { +// RawOutputPrefix provides a mock function with given fields: +func (_m *NodeExecutionContext) RawOutputPrefix() storage.DataReference { ret := _m.Called() - var r0 ioutils.ShardSelector - if rf, ok := ret.Get(0).(func() ioutils.ShardSelector); ok { + var r0 storage.DataReference + if rf, ok := ret.Get(0).(func() storage.DataReference); ok { r0 = rf() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(ioutils.ShardSelector) - } + r0 = ret.Get(0).(storage.DataReference) } return r0 diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 5077bb068..48100ce74 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -44,7 +44,7 @@ type NodeExecutionContext interface { // Sandboxes provide exactly once execution semantics and only the successful sandbox wins. Ideally a sandbox should be a path that is // available to the task at High Bandwidth (for example the base path of a sharded s3 bucket. // This with a prefix based sharded strategy, could improve the throughput from S3 manifold) - OutputDataSandboxBasePath() storage.DataReference + RawOutputPrefix() storage.DataReference // Sharding strategy for the output data for this node execution. OutputShardSelector() ioutils.ShardSelector diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 4e3353fa3..892659c03 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -48,7 +48,7 @@ type execContext struct { nsm *nodeStateManager enqueueOwner func() error w v1alpha1.ExecutableWorkflow - outputDataSandbox storage.DataReference + rawOutputPrefix storage.DataReference shardSelector ioutils.ShardSelector } @@ -56,8 +56,8 @@ func (e execContext) OutputShardSelector() ioutils.ShardSelector { return e.shardSelector } -func (e execContext) OutputDataSandboxBasePath() storage.DataReference { - return e.outputDataSandbox +func (e execContext) RawOutputPrefix() storage.DataReference { + return e.rawOutputPrefix } func (e execContext) EnqueueOwnerFunc() func() error { @@ -116,7 +116,7 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, outputSandbox storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { md := execMetadata{WorkflowMeta: w} // Copying the labels before updating it for this node @@ -142,7 +142,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1. nsm: nsm, enqueueOwner: enqueueOwner, w: w, - outputDataSandbox: outputSandbox, + rawOutputPrefix: rawOutputPrefix, shardSelector: outputShardSelector, } } diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 34cb77d6d..4fe39d4c0 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -383,7 +383,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) st := bytes.NewBuffer([]byte{}) @@ -687,7 +687,7 @@ func Test_task_Handle_Catalog(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) st := bytes.NewBuffer([]byte{}) @@ -892,7 +892,7 @@ func Test_task_Handle_Barrier(t *testing.T) { nCtx.On("EventsRecorder").Return(recorder) nCtx.On("EnqueueOwner").Return(nil) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) st := bytes.NewBuffer([]byte{}) @@ -1148,7 +1148,7 @@ func Test_task_Abort(t *testing.T) { nCtx.On("EnqueueOwner").Return(nil) nCtx.On("EventsRecorder").Return(ev) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) st := bytes.NewBuffer([]byte{}) @@ -1272,7 +1272,7 @@ func Test_task_Finalize(t *testing.T) { nCtx.On("EventsRecorder").Return(nil) nCtx.On("EnqueueOwner").Return(nil) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) diff --git a/pkg/controller/nodes/task/taskexec_context.go b/pkg/controller/nodes/task/taskexec_context.go index b7415a939..a9d7d54eb 100644 --- a/pkg/controller/nodes/task/taskexec_context.go +++ b/pkg/controller/nodes/task/taskexec_context.go @@ -124,7 +124,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node return nil, err } - outputSandbox, err := ioutils.NewShardedOutputSandbox(ctx, nCtx.OutputShardSelector(), nCtx.OutputDataSandboxBasePath(), uniqueID, nCtx.DataStore()) + outputSandbox, err := ioutils.NewShardedRawOutputPath(ctx, nCtx.OutputShardSelector(), nCtx.RawOutputPrefix(), uniqueID, nCtx.DataStore()) if err != nil { return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "failed to create output sandbox for node execution") } diff --git a/pkg/controller/nodes/task/taskexec_context_test.go b/pkg/controller/nodes/task/taskexec_context_test.go index bc9084f64..356bdfe13 100644 --- a/pkg/controller/nodes/task/taskexec_context_test.go +++ b/pkg/controller/nodes/task/taskexec_context_test.go @@ -83,7 +83,7 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { PluginState: st.Bytes(), }) nCtx.On("NodeStateReader").Return(nr) - nCtx.OnOutputDataSandboxBasePath().Return("s3://sandbox/") + nCtx.OnRawOutputPrefix().Return("s3://sandbox/") nCtx.OnOutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) From 5af24bfdb4a2e396de6ca6ebee277d63eee1ed8b Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 17 Mar 2020 20:58:04 -0700 Subject: [PATCH 5/8] linter fix --- pkg/controller/config/config.go | 2 +- pkg/controller/nodes/node_exec_context_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 46096d41e..4f559181f 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -72,7 +72,7 @@ type Config struct { PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."` MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"` KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"` - NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` + NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"` } type KubeClientConfig struct { diff --git a/pkg/controller/nodes/node_exec_context_test.go b/pkg/controller/nodes/node_exec_context_test.go index 37a6efcc1..5b7c20dc3 100644 --- a/pkg/controller/nodes/node_exec_context_test.go +++ b/pkg/controller/nodes/node_exec_context_test.go @@ -5,12 +5,13 @@ import ( "testing" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" ) type TaskReader struct{} From 303dfea0be7c35d5a18ef8200e80b75a414acf33 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Mon, 23 Mar 2020 21:43:19 -0700 Subject: [PATCH 6/8] Working and merged from master --- go.mod | 4 +-- go.sum | 5 ++++ pkg/controller/config/config_flags.go | 2 +- pkg/controller/config/config_flags_test.go | 27 ++++++++++++++----- pkg/controller/nodes/node_exec_context.go | 2 +- .../nodes/node_exec_context_test.go | 2 +- 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 6c0c0ee87..b2d2c4da4 100644 --- a/go.mod +++ b/go.mod @@ -22,8 +22,8 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8 - github.com/lyft/flytestdlib v0.3.2 + github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 + github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect github.com/mitchellh/mapstructure v1.1.2 diff --git a/go.sum b/go.sum index 31c3b8052..a891fe3d7 100644 --- a/go.sum +++ b/go.sum @@ -386,9 +386,14 @@ github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+ github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30= +github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= +github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= +github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index b1168fddb..4b52f673b 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -82,6 +82,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.node-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.String(), "Default value of node timeout") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "node-config.default-deadlines.workflow-active-deadline"), defaultConfig.NodeConfig.DefaultDeadlines.DefaultWorkflowActiveDeadline.String(), "Default value of workflow timeout") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.max-node-retries-system-failures"), defaultConfig.NodeConfig.MaxNodeRetriesOnSystemFailures, "Maximum number of retries per node for node failure due to infra issues") - cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, " number of failures for a node to be still considered interruptible'") + cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "node-config.interruptible-failure-threshold"), defaultConfig.NodeConfig.InterruptibleFailureThreshold, "number of failures for a node to be still considered interruptible'") return cmdFlags } diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index 4c8e8c955..1d3db8cb0 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -1001,13 +1001,26 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_node-config.interruptible-failure-threshold", func(t *testing.T) { - // Test that default value is set properly - if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { - assert.Equal(t, int64(defaultConfig.NodeConfig.InterruptibleFailureThreshold), vInt64) - } else { - assert.FailNow(t, err.Error()) - } + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { + assert.Equal(t, int64(defaultConfig.NodeConfig.InterruptibleFailureThreshold), vInt64) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("node-config.interruptible-failure-threshold", testValue) + if vInt64, err := cmdFlags.GetInt64("node-config.interruptible-failure-threshold"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt64), &actual.NodeConfig.InterruptibleFailureThreshold) + + } else { + assert.FailNow(t, err.Error()) + } + }) }) } diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index b532c279e..6099573f8 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -123,7 +123,7 @@ func (e execContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { +func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *execContext { md := execMetadata{WorkflowMeta: w, interrutptible: interruptible} // Copy the wf labels before adding node specific labels. diff --git a/pkg/controller/nodes/node_exec_context_test.go b/pkg/controller/nodes/node_exec_context_test.go index bdc37ded2..dc42eeedb 100644 --- a/pkg/controller/nodes/node_exec_context_test.go +++ b/pkg/controller/nodes/node_exec_context_test.go @@ -47,7 +47,7 @@ func Test_NodeContext(t *testing.T) { Kind: v1alpha1.NodeKindTask, } s, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) - nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, 0, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) + nCtx := newNodeExecContext(context.TODO(), s, w1, n, nil, nil, false, 0, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) assert.Equal(t, "id", nCtx.NodeExecutionMetadata().GetLabels()["node-id"]) assert.Equal(t, "false", nCtx.NodeExecutionMetadata().GetLabels()["interruptible"]) assert.Equal(t, "task-name", nCtx.NodeExecutionMetadata().GetLabels()["task-name"]) From 2ba20d28ed86cff4dab7af74c372adbaebb721e6 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 25 Mar 2020 16:56:04 -0700 Subject: [PATCH 7/8] updated flyteplugins --- go.mod | 4 ++-- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index b2d2c4da4..cf43cd47f 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 - github.com/lyft/flyteidl v0.17.8 - github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 + github.com/lyft/flyteidl v0.17.9 + github.com/lyft/flyteplugins v0.3.15 github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect diff --git a/go.sum b/go.sum index a891fe3d7..25748404e 100644 --- a/go.sum +++ b/go.sum @@ -382,6 +382,8 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ= github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0= github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= +github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w= github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc= @@ -389,6 +391,8 @@ github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDU github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30= github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA= +github.com/lyft/flyteplugins v0.3.15 h1:chDrm8maK3dCSy7UM8ElfmzTUBn1fiF7UnmP4px4sVI= +github.com/lyft/flyteplugins v0.3.15/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= From 513c308d16e66b7b4c48403398bd6a50860b5d2d Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 25 Mar 2020 17:04:37 -0700 Subject: [PATCH 8/8] Fix linter and comment --- pkg/controller/nodes/dynamic/handler.go | 2 ++ pkg/controller/nodes/executor.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index d98eddc5c..853e0c974 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -106,6 +106,8 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n if trns.Info().GetPhase() == handler.EPhaseSuccess { logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID()) + // These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil. + // The sandbox creation as it uses hashing can be expensive and we skip that expense. outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil) execID := task.GetTaskExecutionIdentifier(nCtx) outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 5d8c356d8..a1d7d1a7b 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -67,7 +67,7 @@ type nodeExecutor struct { defaultExecutionDeadline time.Duration defaultActiveDeadline time.Duration maxNodeRetriesForSystemFailures uint32 - interruptibleFailureThreshold uint32 + interruptibleFailureThreshold uint32 defaultDataSandbox storage.DataReference shardSelector ioutils.ShardSelector } @@ -777,7 +777,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures), interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold), defaultDataSandbox: defaultRawOutputPrefix, - shardSelector: shardSelector, + shardSelector: shardSelector, } nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory