Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge 17f241e into f5f4182
Browse files Browse the repository at this point in the history
  • Loading branch information
ByronHsu authored Apr 19, 2023
2 parents f5f4182 + 17f241e commit 3c7a135
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mpi
import (
"context"
"fmt"
"strings"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
Expand All @@ -21,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const workerSpecCommandKey = "worker_spec_command"

type mpiOperatorResourceHandler struct {
}

Expand All @@ -45,6 +48,7 @@ func (mpiOperatorResourceHandler) BuildIdentityResource(ctx context.Context, tas
// Defines a func to create the full resource object that will be posted to k8s.
func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
taskTemplate, err := taskCtx.TaskReader().Read(ctx)
taskTemplateConfig := taskTemplate.GetConfig()

if err != nil {
return nil, flyteerr.Errorf(flyteerr.BadTaskSpecification, "unable to fetch task specification [%v]", err.Error())
Expand All @@ -69,11 +73,19 @@ func (mpiOperatorResourceHandler) BuildResource(ctx context.Context, taskCtx plu
common.OverridePrimaryContainerName(podSpec, primaryContainerName, kubeflowv1.MPIJobDefaultContainerName)

// workersPodSpec is deepCopy of podSpec submitted by flyte
// WorkerPodSpec doesn't need any Argument & command. It will be trigger from launcher pod
workersPodSpec := podSpec.DeepCopy()

// If users don't specify "worker_spec_command" in the task config, the command/args are empty.
// However, in some cases, the workers need command/args.
// For example, in horovod tasks, each worker runs a command launching ssh daemon.

workerSpecCommand := []string{}
if val, ok := taskTemplateConfig[workerSpecCommandKey]; ok {
workerSpecCommand = strings.Split(val, " ")
}

for k := range workersPodSpec.Containers {
workersPodSpec.Containers[k].Args = []string{}
workersPodSpec.Containers[k].Args = workerSpecCommand
workersPodSpec.Containers[k].Command = []string{}
}

Expand Down

0 comments on commit 3c7a135

Please sign in to comment.