Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

RawDataOutput directory for every task execution #92

Merged
merged 10 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.8
github.com/lyft/flyteplugins v0.3.12
github.com/lyft/flytestdlib v0.3.2
github.com/lyft/flyteidl v0.17.9
github.com/lyft/flyteplugins v0.3.15
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/mapstructure v1.1.2
Expand Down
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,22 @@ github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ=
github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0=
github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.12 h1:ZfuHCwwZm5F6cII5X6Z/evBxJS+sZp9i/jkYySujIa0=
github.com/lyft/flyteplugins v0.3.12/go.mod h1:2fMH+Le0rlMlSOq5Z6utnkvDmw8AyYk7lxuAnYhlAI8=
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w=
github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30=
github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA=
github.com/lyft/flyteplugins v0.3.15 h1:chDrm8maK3dCSy7UM8ElfmzTUBn1fiF7UnmP4px4sVI=
github.com/lyft/flyteplugins v0.3.15/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
39 changes: 20 additions & 19 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ var (
// Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is
// the base configuration to start propeller
type Config struct {
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Workers int `json:"workers" pflag:"2,Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"`
ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."`
EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"`
MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"`
MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"`
LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."`
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Workers int `json:"workers" pflag:"2,Number of threads to process workflows"`
WorkflowReEval config.Duration `json:"workflow-reeval-duration" pflag:"\"30s\",Frequency of re-evaluating workflows"`
DownstreamEval config.Duration `json:"downstream-eval-duration" pflag:"\"60s\",Frequency of re-evaluating downstream tasks"`
LimitNamespace string `json:"limit-namespace" pflag:"\"all\",Namespaces to watch for this propeller"`
ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"`
MetadataPrefix string `json:"metadata-prefix,omitempty" pflag:",MetadataPrefix should be used if all the metadata for Flyte executions should be stored under a specific prefix in CloudStorage. If not specified, the data will be stored in the base container directly."`
DefaultRawOutputPrefix string `json:"rawoutput-prefix" pflag:",a fully qualified storage path of the form s3://flyte/abc/..., where all data sandboxes should be stored."`
Queue CompositeQueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
MetricsPrefix string `json:"metrics-prefix" pflag:"\"flyte:\",An optional prefix for all published metrics."`
EnableAdminLauncher bool `json:"enable-admin-launcher" pflag:"false, Enable remote Workflow launcher to Admin"`
MaxWorkflowRetries int `json:"max-workflow-retries" pflag:"50,Maximum number of retries per workflow"`
MaxTTLInHours int `json:"max-ttl-hours" pflag:"23,Maximum number of hours a completed workflow should be retained. Number between 1-23 hours"`
GCInterval config.Duration `json:"gc-interval" pflag:"\"30m\",Run periodic GC every 30 minutes"`
LeaderElection LeaderElectionConfig `json:"leader-election,omitempty" pflag:",Config for leader election."`
PublishK8sEvents bool `json:"publish-k8s-events" pflag:",Enable events publishing to K8s events API."`
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
}

type KubeClientConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 42 additions & 7 deletions pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package controller
import (
"context"

errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
stdErrs "github.com/lyft/flytestdlib/errors"

errors3 "github.com/lyft/flytepropeller/pkg/controller/nodes/errors"

"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog"

Expand Down Expand Up @@ -297,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, kubeClient, catalogClient, scope)
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n

if trns.Info().GetPhase() == handler.EPhaseSuccess {
logger.Infof(ctx, "dynamic workflow node has succeeded, will call on success handler for parent node [%s]", nCtx.NodeID())
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir())
// These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil.
// The sandbox creation as it uses hashing can be expensive and we skip that expense.
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil)
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
execID := task.GetTaskExecutionIdentifier(nCtx)
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
Expand Down
17 changes: 15 additions & 2 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ import (
"fmt"
"time"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils"
errors2 "github.com/lyft/flytestdlib/errors"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/clients/go/events"
eventsErr "github.com/lyft/flyteidl/clients/go/events/errors"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"

"github.com/lyft/flytepropeller/pkg/controller/config"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -66,6 +68,8 @@ type nodeExecutor struct {
defaultActiveDeadline time.Duration
maxNodeRetriesForSystemFailures uint32
interruptibleFailureThreshold uint32
defaultDataSandbox storage.DataReference
shardSelector ioutils.ShardSelector
}

func (c *nodeExecutor) RecordTransitionLatency(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) {
Expand Down Expand Up @@ -735,7 +739,14 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error {
return c.nodeHandlerFactory.Setup(ctx, s)
}

func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, workflowLauncher launchplan.Executor, maxDatasetSize int64, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) {
func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink,
workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) {

// TODO we may want to make this configurable.
shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx)
if err != nil {
return nil, err
}

nodeScope := scope.NewSubScope("node")
exec := &nodeExecutor{
Expand Down Expand Up @@ -765,6 +776,8 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
defaultActiveDeadline: nodeConfig.DefaultDeadlines.DefaultNodeActiveDeadline.Duration,
maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures),
interruptibleFailureThreshold: uint32(nodeConfig.InterruptibleFailureThreshold),
defaultDataSandbox: defaultRawOutputPrefix,
shardSelector: shardSelector,
}
nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope)
exec.nodeHandlerFactory = nodeHandlerFactory
Expand Down
Loading