Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Add --timeout flag to fluxctl #2056

Merged
merged 3 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cmd/fluxctl/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var ErrTimeout = errors.New("timeout")

// await polls for a job to complete, then for the resulting commit to
// be applied
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int) error {
result, err := awaitJob(ctx, client, jobID)
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int, timeout time.Duration) error {
result, err := awaitJob(ctx, client, jobID, timeout)
if err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
Expand All @@ -41,7 +41,7 @@ is safe to retry operations.`)
}

if apply && result.Revision != "" {
if err := awaitSync(ctx, client, result.Revision); err != nil {
if err := awaitSync(ctx, client, result.Revision, timeout); err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
The operation succeeded, but we timed out waiting for the commit to be
Expand All @@ -61,9 +61,9 @@ to run a sync interactively.`)
}

// await polls for a job to have been completed, with exponential backoff.
func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result, error) {
func awaitJob(ctx context.Context, client api.Server, jobID job.ID, timeout time.Duration) (job.Result, error) {
var result job.Result
err := backoff(100*time.Millisecond, 2, 50, 1*time.Minute, func() (bool, error) {
err := backoff(100*time.Millisecond, 2, 50, timeout, func() (bool, error) {
j, err := client.JobStatus(ctx, jobID)
if err != nil {
return false, err
Expand All @@ -86,8 +86,8 @@ func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result,
}

// await polls for a commit to have been applied, with exponential backoff.
func awaitSync(ctx context.Context, client api.Server, revision string) error {
return backoff(1*time.Second, 2, 10, 1*time.Minute, func() (bool, error) {
func awaitSync(ctx context.Context, client api.Server, revision string, timeout time.Duration) error {
return backoff(1*time.Second, 2, 10, timeout, func() (bool, error) {
refs, err := client.SyncStatus(ctx, revision)
return err == nil && len(refs) == 0, err
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/fluxctl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/weaveworks/flux/http/client"
Expand All @@ -24,6 +25,7 @@ func mockServiceOpts(trip *genericMockRoundTripper) *rootOpts {
mockAPI := client.New(&c, transport.NewAPIRouter(), "", "")
return &rootOpts{
API: mockAPI,
Timeout: 10*time.Second,
}
}

Expand Down Expand Up @@ -73,9 +75,9 @@ func testArgs(t *testing.T, args []string, shouldErr bool, errMsg string) *gener
cmd.SetArgs(args)
if err := cmd.Execute(); (err == nil) == shouldErr {
if errMsg != "" {
t.Fatal(errMsg)
t.Fatalf("%s: %s", args, errMsg)
} else {
t.Fatal(err)
t.Fatalf("%s: %v", args, err)
}
}
return svc
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluxctl/policy_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (opts *workloadPolicyOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity)
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity, opts.Timeout)
}

func calculatePolicyChanges(opts *workloadPolicyOpts) (resource.PolicyUpdate, error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/release_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
return err
}

result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if err != nil {
return err
}
Expand All @@ -188,7 +188,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
opts.dryRun = false
}

err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity)
err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity, opts.Timeout)
if !opts.watch || err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/fluxctl/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -23,6 +24,7 @@ type rootOpts struct {
Namespace string
Labels map[string]string
API api.Server
Timeout time.Duration
}

func newRoot() *rootOpts {
Expand Down Expand Up @@ -50,12 +52,13 @@ Workflow:
`)

const (
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableURL = "FLUX_URL"
envVariableNamespace = "FLUX_FORWARD_NAMESPACE"
envVariableLabels = "FLUX_FORWARD_LABELS"
envVariableToken = "FLUX_SERVICE_TOKEN"
envVariableCloudToken = "WEAVE_CLOUD_TOKEN"
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableTimeout = "FLUX_TIMEOUT"
suvl marked this conversation as resolved.
Show resolved Hide resolved
)

func (opts *rootOpts) Command() *cobra.Command {
Expand All @@ -75,7 +78,8 @@ func (opts *rootOpts) Command() *cobra.Command {
fmt.Sprintf("Base URL of the Flux API (defaults to %q if a token is provided); you can also set the environment variable %s", defaultURLGivenToken, envVariableURL))
cmd.PersistentFlags().StringVarP(&opts.Token, "token", "t", "",
fmt.Sprintf("Weave Cloud authentication token; you can also set the environment variable %s or %s", envVariableCloudToken, envVariableToken))

cmd.PersistentFlags().DurationVar(&opts.Timeout, "timeout", 60*time.Second,
fmt.Sprintf("Global command timeout; you can also set the environment variable %s", envVariableTimeout))
cmd.AddCommand(
newVersionCommand(),
newImageList(opts).Command(),
Expand Down Expand Up @@ -108,6 +112,7 @@ func (opts *rootOpts) PersistentPreRunE(cmd *cobra.Command, _ []string) error {
setFromEnvIfNotSet(cmd.Flags(), "k8s-fwd-labels", envVariableLabels)
setFromEnvIfNotSet(cmd.Flags(), "token", envVariableToken, envVariableCloudToken)
setFromEnvIfNotSet(cmd.Flags(), "url", envVariableURL)
setFromEnvIfNotSet(cmd.Flags(), "timeout", envVariableTimeout)

if opts.Token != "" && opts.URL == "" {
opts.URL = defaultURLGivenToken
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/sync_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if isUnverifiedHead(err) {
fmt.Fprintf(cmd.OutOrStderr(), "Warning: %s\n", err)
} else if err != nil {
Expand All @@ -70,7 +70,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
rev := result.Revision[:7]
fmt.Fprintf(cmd.OutOrStderr(), "Revision of %s to apply is %s\n", gitConfig.Remote.Branch, rev)
fmt.Fprintf(cmd.OutOrStderr(), "Waiting for %s to be applied ...\n", rev)
err = awaitSync(ctx, opts.API, rev)
err = awaitSync(ctx, opts.API, rev, opts.Timeout)
if err != nil {
return err
}
Expand Down