From 1942027d0b666d4fd8e8845fb0b0af3318ee0e80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Trigo=20Soares?= Date: Tue, 18 Jun 2019 07:20:01 +0100 Subject: [PATCH] added --timeout command line parameter and FLUX_TIMEOUT env variable --- cmd/fluxctl/await.go | 14 +++++++------- cmd/fluxctl/policy_cmd.go | 2 +- cmd/fluxctl/release_cmd.go | 4 ++-- cmd/fluxctl/root_cmd.go | 7 ++++++- cmd/fluxctl/sync_cmd.go | 4 ++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/cmd/fluxctl/await.go b/cmd/fluxctl/await.go index 681e34691..bb1cc4bdd 100644 --- a/cmd/fluxctl/await.go +++ b/cmd/fluxctl/await.go @@ -16,8 +16,8 @@ var ErrTimeout = errors.New("timeout") // await polls for a job to complete, then for the resulting commit to // be applied -func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int) error { - result, err := awaitJob(ctx, client, jobID) +func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int, timeout int) error { + result, err := awaitJob(ctx, client, jobID, timeout) if err != nil { if err == ErrTimeout { fmt.Fprintln(stderr, ` @@ -41,7 +41,7 @@ is safe to retry operations.`) } if apply && result.Revision != "" { - if err := awaitSync(ctx, client, result.Revision); err != nil { + if err := awaitSync(ctx, client, result.Revision, timeout); err != nil { if err == ErrTimeout { fmt.Fprintln(stderr, ` The operation succeeded, but we timed out waiting for the commit to be @@ -61,9 +61,9 @@ to run a sync interactively.`) } // await polls for a job to have been completed, with exponential backoff. -func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result, error) { +func awaitJob(ctx context.Context, client api.Server, jobID job.ID, timeout int) (job.Result, error) { var result job.Result - err := backoff(100*time.Millisecond, 2, 50, 1*time.Minute, func() (bool, error) { + err := backoff(100*time.Millisecond, 2, 50, time.Duration(timeout)*time.Second, func() (bool, error) { j, err := client.JobStatus(ctx, jobID) if err != nil { return false, err @@ -86,8 +86,8 @@ func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result, } // await polls for a commit to have been applied, with exponential backoff. -func awaitSync(ctx context.Context, client api.Server, revision string) error { - return backoff(1*time.Second, 2, 10, 1*time.Minute, func() (bool, error) { +func awaitSync(ctx context.Context, client api.Server, revision string, timeout int) error { + return backoff(1*time.Second, 2, 10, time.Duration(timeout)*time.Second, func() (bool, error) { refs, err := client.SyncStatus(ctx, revision) return err == nil && len(refs) == 0, err }) diff --git a/cmd/fluxctl/policy_cmd.go b/cmd/fluxctl/policy_cmd.go index 7d15a5292..f8d53e2b3 100644 --- a/cmd/fluxctl/policy_cmd.go +++ b/cmd/fluxctl/policy_cmd.go @@ -120,7 +120,7 @@ func (opts *workloadPolicyOpts) RunE(cmd *cobra.Command, args []string) error { if err != nil { return err } - return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity) + return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity, opts.Timeout) } func calculatePolicyChanges(opts *workloadPolicyOpts) (resource.PolicyUpdate, error) { diff --git a/cmd/fluxctl/release_cmd.go b/cmd/fluxctl/release_cmd.go index 80e0576a1..77c023ea4 100644 --- a/cmd/fluxctl/release_cmd.go +++ b/cmd/fluxctl/release_cmd.go @@ -162,7 +162,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error { return err } - result, err := awaitJob(ctx, opts.API, jobID) + result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout) if err != nil { return err } @@ -188,7 +188,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error { opts.dryRun = false } - err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity) + err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity, opts.Timeout) if !opts.watch || err != nil { return err } diff --git a/cmd/fluxctl/root_cmd.go b/cmd/fluxctl/root_cmd.go index 1f4eea652..6678a9848 100644 --- a/cmd/fluxctl/root_cmd.go +++ b/cmd/fluxctl/root_cmd.go @@ -23,6 +23,7 @@ type rootOpts struct { Namespace string Labels map[string]string API api.Server + Timeout int } func newRoot() *rootOpts { @@ -50,12 +51,13 @@ Workflow: `) const ( + defaultURLGivenToken = "https://cloud.weave.works/api/flux" envVariableURL = "FLUX_URL" envVariableNamespace = "FLUX_FORWARD_NAMESPACE" envVariableLabels = "FLUX_FORWARD_LABELS" envVariableToken = "FLUX_SERVICE_TOKEN" envVariableCloudToken = "WEAVE_CLOUD_TOKEN" - defaultURLGivenToken = "https://cloud.weave.works/api/flux" + envVariableTimeout = "FLUX_TIMEOUT" ) func (opts *rootOpts) Command() *cobra.Command { @@ -75,6 +77,8 @@ func (opts *rootOpts) Command() *cobra.Command { fmt.Sprintf("Base URL of the Flux API (defaults to %q if a token is provided); you can also set the environment variable %s", defaultURLGivenToken, envVariableURL)) cmd.PersistentFlags().StringVarP(&opts.Token, "token", "t", "", fmt.Sprintf("Weave Cloud authentication token; you can also set the environment variable %s or %s", envVariableCloudToken, envVariableToken)) + cmd.PersistentFlags().IntVar(&opts.Timeout, "timeout", 60, + fmt.Sprintf("Global command timeout, in seconds; you can also set the environment variable %s", envVariableTimeout)) cmd.AddCommand( newVersionCommand(), @@ -108,6 +112,7 @@ func (opts *rootOpts) PersistentPreRunE(cmd *cobra.Command, _ []string) error { setFromEnvIfNotSet(cmd.Flags(), "k8s-fwd-labels", envVariableLabels) setFromEnvIfNotSet(cmd.Flags(), "token", envVariableToken, envVariableCloudToken) setFromEnvIfNotSet(cmd.Flags(), "url", envVariableURL) + setFromEnvIfNotSet(cmd.Flags(), "timeout", envVariableTimeout) if opts.Token != "" && opts.URL == "" { opts.URL = defaultURLGivenToken diff --git a/cmd/fluxctl/sync_cmd.go b/cmd/fluxctl/sync_cmd.go index f13fccf9b..9eb1cd940 100644 --- a/cmd/fluxctl/sync_cmd.go +++ b/cmd/fluxctl/sync_cmd.go @@ -59,7 +59,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error { if err != nil { return err } - result, err := awaitJob(ctx, opts.API, jobID) + result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout) if isUnverifiedHead(err) { fmt.Fprintf(cmd.OutOrStderr(), "Warning: %s\n", err) } else if err != nil { @@ -70,7 +70,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error { rev := result.Revision[:7] fmt.Fprintf(cmd.OutOrStderr(), "Revision of %s to apply is %s\n", gitConfig.Remote.Branch, rev) fmt.Fprintf(cmd.OutOrStderr(), "Waiting for %s to be applied ...\n", rev) - err = awaitSync(ctx, opts.API, rev) + err = awaitSync(ctx, opts.API, rev, opts.Timeout) if err != nil { return err }