Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2056 from suvl/feature-timeout
Browse files Browse the repository at this point in the history
Add `--timeout` flag to `fluxctl`
  • Loading branch information
stefanprodan authored Aug 20, 2019
2 parents 79f13f8 + a7b123e commit 9a0977c
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 16 deletions.
14 changes: 7 additions & 7 deletions cmd/fluxctl/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var ErrTimeout = errors.New("timeout")

// await polls for a job to complete, then for the resulting commit to
// be applied
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int) error {
result, err := awaitJob(ctx, client, jobID)
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int, timeout time.Duration) error {
result, err := awaitJob(ctx, client, jobID, timeout)
if err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
Expand All @@ -41,7 +41,7 @@ is safe to retry operations.`)
}

if apply && result.Revision != "" {
if err := awaitSync(ctx, client, result.Revision); err != nil {
if err := awaitSync(ctx, client, result.Revision, timeout); err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
The operation succeeded, but we timed out waiting for the commit to be
Expand All @@ -61,9 +61,9 @@ to run a sync interactively.`)
}

// await polls for a job to have been completed, with exponential backoff.
func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result, error) {
func awaitJob(ctx context.Context, client api.Server, jobID job.ID, timeout time.Duration) (job.Result, error) {
var result job.Result
err := backoff(100*time.Millisecond, 2, 50, 1*time.Minute, func() (bool, error) {
err := backoff(100*time.Millisecond, 2, 50, timeout, func() (bool, error) {
j, err := client.JobStatus(ctx, jobID)
if err != nil {
return false, err
Expand All @@ -86,8 +86,8 @@ func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result,
}

// await polls for a commit to have been applied, with exponential backoff.
func awaitSync(ctx context.Context, client api.Server, revision string) error {
return backoff(1*time.Second, 2, 10, 1*time.Minute, func() (bool, error) {
func awaitSync(ctx context.Context, client api.Server, revision string, timeout time.Duration) error {
return backoff(1*time.Second, 2, 10, timeout, func() (bool, error) {
refs, err := client.SyncStatus(ctx, revision)
return err == nil && len(refs) == 0, err
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/fluxctl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/weaveworks/flux/http/client"
Expand All @@ -24,6 +25,7 @@ func mockServiceOpts(trip *genericMockRoundTripper) *rootOpts {
mockAPI := client.New(&c, transport.NewAPIRouter(), "", "")
return &rootOpts{
API: mockAPI,
Timeout: 10*time.Second,
}
}

Expand Down Expand Up @@ -73,9 +75,9 @@ func testArgs(t *testing.T, args []string, shouldErr bool, errMsg string) *gener
cmd.SetArgs(args)
if err := cmd.Execute(); (err == nil) == shouldErr {
if errMsg != "" {
t.Fatal(errMsg)
t.Fatalf("%s: %s", args, errMsg)
} else {
t.Fatal(err)
t.Fatalf("%s: %v", args, err)
}
}
return svc
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluxctl/policy_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (opts *workloadPolicyOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity)
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity, opts.Timeout)
}

func calculatePolicyChanges(opts *workloadPolicyOpts) (resource.PolicyUpdate, error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/release_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
return err
}

result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if err != nil {
return err
}
Expand All @@ -188,7 +188,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
opts.dryRun = false
}

err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity)
err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity, opts.Timeout)
if !opts.watch || err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/fluxctl/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -23,6 +24,7 @@ type rootOpts struct {
Namespace string
Labels map[string]string
API api.Server
Timeout time.Duration
}

func newRoot() *rootOpts {
Expand Down Expand Up @@ -50,12 +52,13 @@ Workflow:
`)

const (
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableURL = "FLUX_URL"
envVariableNamespace = "FLUX_FORWARD_NAMESPACE"
envVariableLabels = "FLUX_FORWARD_LABELS"
envVariableToken = "FLUX_SERVICE_TOKEN"
envVariableCloudToken = "WEAVE_CLOUD_TOKEN"
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableTimeout = "FLUX_TIMEOUT"
)

func (opts *rootOpts) Command() *cobra.Command {
Expand All @@ -75,7 +78,8 @@ func (opts *rootOpts) Command() *cobra.Command {
fmt.Sprintf("Base URL of the Flux API (defaults to %q if a token is provided); you can also set the environment variable %s", defaultURLGivenToken, envVariableURL))
cmd.PersistentFlags().StringVarP(&opts.Token, "token", "t", "",
fmt.Sprintf("Weave Cloud authentication token; you can also set the environment variable %s or %s", envVariableCloudToken, envVariableToken))

cmd.PersistentFlags().DurationVar(&opts.Timeout, "timeout", 60*time.Second,
fmt.Sprintf("Global command timeout; you can also set the environment variable %s", envVariableTimeout))
cmd.AddCommand(
newVersionCommand(),
newImageList(opts).Command(),
Expand Down Expand Up @@ -108,6 +112,7 @@ func (opts *rootOpts) PersistentPreRunE(cmd *cobra.Command, _ []string) error {
setFromEnvIfNotSet(cmd.Flags(), "k8s-fwd-labels", envVariableLabels)
setFromEnvIfNotSet(cmd.Flags(), "token", envVariableToken, envVariableCloudToken)
setFromEnvIfNotSet(cmd.Flags(), "url", envVariableURL)
setFromEnvIfNotSet(cmd.Flags(), "timeout", envVariableTimeout)

if opts.Token != "" && opts.URL == "" {
opts.URL = defaultURLGivenToken
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/sync_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if isUnverifiedHead(err) {
fmt.Fprintf(cmd.OutOrStderr(), "Warning: %s\n", err)
} else if err != nil {
Expand All @@ -70,7 +70,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
rev := result.Revision[:7]
fmt.Fprintf(cmd.OutOrStderr(), "Revision of %s to apply is %s\n", gitConfig.Remote.Branch, rev)
fmt.Fprintf(cmd.OutOrStderr(), "Waiting for %s to be applied ...\n", rev)
err = awaitSync(ctx, opts.API, rev)
err = awaitSync(ctx, opts.API, rev, opts.Timeout)
if err != nil {
return err
}
Expand Down

0 comments on commit 9a0977c

Please sign in to comment.