From ce41995614dda93ad09fa4d76f900269594db6bd Mon Sep 17 00:00:00 2001 From: Sascha Grunert Date: Wed, 16 Oct 2024 09:35:02 +0200 Subject: [PATCH] Use errorUtils for exec goroutine aggregation This functionality is already used within the project instead of the custom map function. Signed-off-by: Sascha Grunert --- cmd/crictl/exec.go | 80 ++++++++++++++++------------------------------ 1 file changed, 27 insertions(+), 53 deletions(-) diff --git a/cmd/crictl/exec.go b/cmd/crictl/exec.go index 27c70f0e2d..4259758752 100644 --- a/cmd/crictl/exec.go +++ b/cmd/crictl/exec.go @@ -21,13 +21,12 @@ import ( "errors" "fmt" "net/url" - "runtime" - "sync" "time" mobyterm "github.com/moby/term" "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" + errorUtils "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/rest" remoteclient "k8s.io/client-go/tools/remotecommand" internalapi "k8s.io/cri-api/pkg/apis" @@ -201,39 +200,37 @@ var runtimeExecCommand = &cli.Command{ transport: c.String(transportFlag), } - maxParallel := 1 - if c.Bool("parallel") { - maxParallel = runtime.NumCPU() - } - - results := mapParallel(ids, maxParallel, func(id string) error { - optsCopy := *&opts - optsCopy.id = id + funcs := []func() error{} + for _, id := range ids { + funcs = append(funcs, func() error { + optsCopy := *&opts + optsCopy.id = id - if outputContainerID { - fmt.Println(id + ":") - } - if c.Bool("sync") { - exitCode, err := ExecSync(runtimeClient, optsCopy) - if err != nil { - return fmt.Errorf("execing command in container %s synchronously: %w", id, err) - } - if exitCode != 0 { - return cli.Exit("non-zero exit code", exitCode) + if outputContainerID { + fmt.Println(id + ":") } - } else { - ctx, cancel := context.WithCancel(c.Context) - defer cancel() - err = Exec(ctx, runtimeClient, optsCopy) - if err != nil { - return fmt.Errorf("execing command in container %s: %w", id, err) + if c.Bool("sync") { + exitCode, err := ExecSync(runtimeClient, optsCopy) + if err != nil { + return fmt.Errorf("execing command in container %s synchronously: %w", id, err) + } + if exitCode != 0 { + return cli.Exit("non-zero exit code", exitCode) + } + } else { + ctx, cancel := context.WithCancel(c.Context) + defer cancel() + err = Exec(ctx, runtimeClient, optsCopy) + if err != nil { + return fmt.Errorf("execing command in container %s: %w", id, err) + } } - } - return nil - }) + return nil + }) + } - errs := errors.Join(results...) + errs := errorUtils.AggregateGoroutines(funcs...) if ignoreErrors { logrus.Debugf("Ignoring errors: %v", errs) @@ -244,29 +241,6 @@ var runtimeExecCommand = &cli.Command{ }, } -func mapParallel[T1 any, T2 any](input []T1, maxParallel int, fn func(T1) T2) []T2 { - wg := &sync.WaitGroup{} - wg.Add(len(input)) - - results := make([]T2, len(input)) - maxParallelChan := make(chan struct{}, maxParallel) - - for i := range input { - maxParallelChan <- struct{}{} - go func(index int, x T1) { - defer wg.Done() - - result := fn(x) - results[index] = result - - <-maxParallelChan - }(i, input[i]) - } - - wg.Wait() - return results -} - // ExecSync sends an ExecSyncRequest to the server, and parses // the returned ExecSyncResponse. The function returns the corresponding exit // code beside an general error.