From 2762f976278e4a0471fb2cf236c8d69a6df5f3a7 Mon Sep 17 00:00:00 2001 From: Yuvraj Date: Thu, 30 Dec 2021 03:02:20 +0530 Subject: [PATCH] Added support for pyflyte serialize fast register (#248) * Added support for pyflyte serialize fast register (#239) Signed-off-by: Yuvraj --- .../subcommand/register/files_config.go | 1 + .../subcommand/register/filesconfig_flags.go | 1 + .../register/filesconfig_flags_test.go | 14 ++++++++++++++ flytectl/cmd/register/files.go | 12 +++++++++--- flytectl/cmd/register/register_util.go | 19 ++++++++++++++++--- flytectl/cmd/register/register_util_test.go | 2 +- 6 files changed, 42 insertions(+), 7 deletions(-) diff --git a/flytectl/cmd/config/subcommand/register/files_config.go b/flytectl/cmd/config/subcommand/register/files_config.go index 0db9aa6102..ae618d3380 100644 --- a/flytectl/cmd/config/subcommand/register/files_config.go +++ b/flytectl/cmd/config/subcommand/register/files_config.go @@ -20,5 +20,6 @@ type FilesConfig struct { K8ServiceAccount string `json:"k8ServiceAccount" pflag:", deprecated. Please use --K8sServiceAccount"` OutputLocationPrefix string `json:"outputLocationPrefix" pflag:", custom output location prefix for offloaded types (files/schemas)."` SourceUploadPath string `json:"sourceUploadPath" pflag:", Location for source code in storage."` + DestinationDirectory string `json:"destinationDirectory" pflag:", Location of source code in container."` DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."` } diff --git a/flytectl/cmd/config/subcommand/register/filesconfig_flags.go b/flytectl/cmd/config/subcommand/register/filesconfig_flags.go index 7e196e89d5..910245ee9f 100755 --- a/flytectl/cmd/config/subcommand/register/filesconfig_flags.go +++ b/flytectl/cmd/config/subcommand/register/filesconfig_flags.go @@ -59,6 +59,7 @@ func (cfg FilesConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.StringVar(&DefaultFilesConfig.K8ServiceAccount, fmt.Sprintf("%v%v", prefix, "k8ServiceAccount"), DefaultFilesConfig.K8ServiceAccount, " deprecated. Please use --K8sServiceAccount") cmdFlags.StringVar(&DefaultFilesConfig.OutputLocationPrefix, fmt.Sprintf("%v%v", prefix, "outputLocationPrefix"), DefaultFilesConfig.OutputLocationPrefix, " custom output location prefix for offloaded types (files/schemas).") cmdFlags.StringVar(&DefaultFilesConfig.SourceUploadPath, fmt.Sprintf("%v%v", prefix, "sourceUploadPath"), DefaultFilesConfig.SourceUploadPath, " Location for source code in storage.") + cmdFlags.StringVar(&DefaultFilesConfig.DestinationDirectory, fmt.Sprintf("%v%v", prefix, "destinationDirectory"), DefaultFilesConfig.DestinationDirectory, " Location of source code in container.") cmdFlags.BoolVar(&DefaultFilesConfig.DryRun, fmt.Sprintf("%v%v", prefix, "dryRun"), DefaultFilesConfig.DryRun, "execute command without making any modifications.") return cmdFlags } diff --git a/flytectl/cmd/config/subcommand/register/filesconfig_flags_test.go b/flytectl/cmd/config/subcommand/register/filesconfig_flags_test.go index 024bb0783b..29c5494a67 100755 --- a/flytectl/cmd/config/subcommand/register/filesconfig_flags_test.go +++ b/flytectl/cmd/config/subcommand/register/filesconfig_flags_test.go @@ -225,6 +225,20 @@ func TestFilesConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_destinationDirectory", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("destinationDirectory", testValue) + if vString, err := cmdFlags.GetString("destinationDirectory"); err == nil { + testDecodeJson_FilesConfig(t, fmt.Sprintf("%v", vString), &actual.DestinationDirectory) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_dryRun", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytectl/cmd/register/files.go b/flytectl/cmd/register/files.go index b2801d53ef..895d2f5aa5 100644 --- a/flytectl/cmd/register/files.go +++ b/flytectl/cmd/register/files.go @@ -72,19 +72,25 @@ Override IamRole during registration: :: - flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 -i "arn:aws:iam::123456789:role/dummy" + flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 --assumableIamRole "arn:aws:iam::123456789:role/dummy" Override Kubernetes service account during registration: :: - flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 -k "kubernetes-service-account" + flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 --k8sServiceAccount "kubernetes-service-account" Override Output location prefix during registration: :: - flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 -l "s3://dummy/prefix" + flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 --outputLocationPrefix "s3://dummy/prefix" + +Override Destination dir of source code in container during registration: + +:: + + flytectl register file _pb_output/* -d development -p flytesnacks --continueOnError --version v2 --destinationDirectory "/root" Usage ` diff --git a/flytectl/cmd/register/register_util.go b/flytectl/cmd/register/register_util.go index de2ce65de5..d8b11c8d08 100644 --- a/flytectl/cmd/register/register_util.go +++ b/flytectl/cmd/register/register_util.go @@ -45,6 +45,7 @@ const registrationVersionPattern = "{{ registration.version }}" // Additional variable define in fast serialized proto that needs to be replace in registration time const registrationRemotePackagePattern = "{{ .remote_package_path }}" +const registrationDestDirPattern = "{{ .dest_dir }}" // All supported extensions for compress var supportedExtensions = []string{".tar", ".tgz", ".tar.gz"} @@ -219,16 +220,22 @@ func hydrateIdentifier(identifier *core.Identifier, version string, force bool) } } -func hydrateTaskSpec(task *admin.TaskSpec, sourceCode string, sourceUploadPath string, version string) error { +func hydrateTaskSpec(task *admin.TaskSpec, sourceCode, sourceUploadPath, version, destinationDir string) error { if task.Template.GetContainer() != nil { for k := range task.Template.GetContainer().Args { - if task.Template.GetContainer().Args[k] == "" || task.Template.GetContainer().Args[k] == registrationRemotePackagePattern { + if task.Template.GetContainer().Args[k] == registrationRemotePackagePattern { remotePath, err := getRemoteStoragePath(context.Background(), Client, sourceUploadPath, sourceCode, version) if err != nil { return err } task.Template.GetContainer().Args[k] = string(remotePath) } + if task.Template.GetContainer().Args[k] == registrationDestDirPattern { + task.Template.GetContainer().Args[k] = "." + if len(destinationDir) > 0 { + task.Template.GetContainer().Args[k] = destinationDir + } + } } } else if task.Template.GetK8SPod() != nil && task.Template.GetK8SPod().PodSpec != nil { var podSpec = v1.PodSpec{} @@ -245,6 +252,12 @@ func hydrateTaskSpec(task *admin.TaskSpec, sourceCode string, sourceUploadPath s } podSpec.Containers[containerIdx].Args[argIdx] = string(remotePath) } + if arg == registrationDestDirPattern { + podSpec.Containers[containerIdx].Args[argIdx] = "." + if len(destinationDir) > 0 { + podSpec.Containers[containerIdx].Args[argIdx] = destinationDir + } + } } } podSpecStruct, err := utils.MarshalObjToStruct(podSpec) @@ -340,7 +353,7 @@ func hydrateSpec(message proto.Message, sourceCode string, config rconfig.FilesC taskSpec := message.(*admin.TaskSpec) hydrateIdentifier(taskSpec.Template.Id, config.Version, config.Force) // In case of fast serialize input proto also have on additional variable to substitute i.e destination bucket for source code - if err := hydrateTaskSpec(taskSpec, sourceCode, config.SourceUploadPath, config.Version); err != nil { + if err := hydrateTaskSpec(taskSpec, sourceCode, config.SourceUploadPath, config.Version, config.DestinationDirectory); err != nil { return err } diff --git a/flytectl/cmd/register/register_util_test.go b/flytectl/cmd/register/register_util_test.go index eb53ab32f7..f3af72c5ff 100644 --- a/flytectl/cmd/register/register_util_test.go +++ b/flytectl/cmd/register/register_util_test.go @@ -544,7 +544,7 @@ func TestHydrateTaskSpec(t *testing.T) { }, }, } - err = hydrateTaskSpec(task, "sourcey", rconfig.DefaultFilesConfig.SourceUploadPath, rconfig.DefaultFilesConfig.Version) + err = hydrateTaskSpec(task, "sourcey", rconfig.DefaultFilesConfig.SourceUploadPath, rconfig.DefaultFilesConfig.Version, "") assert.NoError(t, err) var hydratedPodSpec = v1.PodSpec{} err = utils.UnmarshalStructToObj(task.Template.GetK8SPod().PodSpec, &hydratedPodSpec)