Skip to content

Commit

Permalink
Merge pull request #1638 from saschagrunert/exec-aggregate
Browse files Browse the repository at this point in the history
Use errorUtils for exec goroutine aggregation
  • Loading branch information
k8s-ci-robot authored Oct 16, 2024
2 parents 7abeb52 + ce41995 commit 956e121
Showing 1 changed file with 27 additions and 53 deletions.
80 changes: 27 additions & 53 deletions cmd/crictl/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"errors"
"fmt"
"net/url"
"runtime"
"sync"
"time"

mobyterm "github.com/moby/term"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
errorUtils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
remoteclient "k8s.io/client-go/tools/remotecommand"
internalapi "k8s.io/cri-api/pkg/apis"
Expand Down Expand Up @@ -201,39 +200,37 @@ var runtimeExecCommand = &cli.Command{
transport: c.String(transportFlag),
}

maxParallel := 1
if c.Bool("parallel") {
maxParallel = runtime.NumCPU()
}

results := mapParallel(ids, maxParallel, func(id string) error {
optsCopy := *&opts
optsCopy.id = id
funcs := []func() error{}
for _, id := range ids {
funcs = append(funcs, func() error {
optsCopy := *&opts
optsCopy.id = id

if outputContainerID {
fmt.Println(id + ":")
}
if c.Bool("sync") {
exitCode, err := ExecSync(runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s synchronously: %w", id, err)
}
if exitCode != 0 {
return cli.Exit("non-zero exit code", exitCode)
if outputContainerID {
fmt.Println(id + ":")
}
} else {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
err = Exec(ctx, runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s: %w", id, err)
if c.Bool("sync") {
exitCode, err := ExecSync(runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s synchronously: %w", id, err)
}
if exitCode != 0 {
return cli.Exit("non-zero exit code", exitCode)
}
} else {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()
err = Exec(ctx, runtimeClient, optsCopy)
if err != nil {
return fmt.Errorf("execing command in container %s: %w", id, err)
}
}
}

return nil
})
return nil
})
}

errs := errors.Join(results...)
errs := errorUtils.AggregateGoroutines(funcs...)

if ignoreErrors {
logrus.Debugf("Ignoring errors: %v", errs)
Expand All @@ -244,29 +241,6 @@ var runtimeExecCommand = &cli.Command{
},
}

func mapParallel[T1 any, T2 any](input []T1, maxParallel int, fn func(T1) T2) []T2 {
wg := &sync.WaitGroup{}
wg.Add(len(input))

results := make([]T2, len(input))
maxParallelChan := make(chan struct{}, maxParallel)

for i := range input {
maxParallelChan <- struct{}{}
go func(index int, x T1) {
defer wg.Done()

result := fn(x)
results[index] = result

<-maxParallelChan
}(i, input[i])
}

wg.Wait()
return results
}

// ExecSync sends an ExecSyncRequest to the server, and parses
// the returned ExecSyncResponse. The function returns the corresponding exit
// code beside an general error.
Expand Down

0 comments on commit 956e121

Please sign in to comment.