Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Pass worker_spec_command to mpi plugin to support horovod #341

Merged
merged 4 commits into from
Apr 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mpi
import (
"context"
"fmt"
"strings"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
Expand All @@ -21,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const workerSpecCommandKey = "worker_spec_command"

type mpiOperatorResourceHandler struct {
}

Expand All @@ -45,6 +48,7 @@ func (mpiOperatorResourceHandler) BuildIdentityResource(ctx context.Context, tas
// Defines a func to create the full resource object that will be posted to k8s.
func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
taskTemplateConfig := taskTemplate.GetConfig()

if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
Expand All @@ -69,11 +73,19 @@ func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx plu
common.OverrideDefaultContainerName(taskCtx, podSpec, kubeflowv1.MPIJobDefaultContainerName)

// workersPodSpec is deepCopy of podSpec submitted by flyte
// WorkerPodSpec doesn't need any Argument & command. It will be trigger from launcher pod
workersPodSpec := podSpec.DeepCopy()

// If users don't specify "worker_spec_command" in the task config, the command/args are empty.
// However, in some cases, the workers need command/args.
// For example, in horovod tasks, each worker runs a command launching ssh daemon.

workerSpecCommand := []string{}
if val, ok := taskTemplateConfig[workerSpecCommandKey]; ok {
workerSpecCommand = strings.Split(val, " ")
}

for k := range workersPodSpec.Containers {
workersPodSpec.Containers[k].Args = []string{}
workersPodSpec.Containers[k].Args = workerSpecCommand
workersPodSpec.Containers[k].Command = []string{}
Comment on lines +88 to 89
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use workerSpecCommand for container's command? If not, I think we could rename it to workerSpecArgs

Suggested change
workersPodSpec.Containers[k].Args = workerSpecCommand
workersPodSpec.Containers[k].Command = []string{}
workersPodSpec.Containers[k].Args = []string{}
workersPodSpec.Containers[k].Command = workerSpecCommand

Copy link
Contributor Author

@ByronHsu ByronHsu Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same pattern as _get_container of PythonAutoContainer, where we put get_command in args of the container.

I think on the user-facing side (flytekit), we can call them commands, but on the backend side (flytepropeller), we can put the command in args.

Putting either in command or args of the container both work I guess. I just want to make it consistent with other cases

}

Expand Down