Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
added --timeout command line parameter and FLUX_TIMEOUT env variable
Browse files Browse the repository at this point in the history
  • Loading branch information
suvl committed Jun 18, 2019
1 parent af9d3be commit d224c25
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 13 deletions.
14 changes: 7 additions & 7 deletions cmd/fluxctl/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var ErrTimeout = errors.New("timeout")

// await polls for a job to complete, then for the resulting commit to
// be applied
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int) error {
result, err := awaitJob(ctx, client, jobID)
func await(ctx context.Context, stdout, stderr io.Writer, client api.Server, jobID job.ID, apply bool, verbosity int, timeout int) error {
result, err := awaitJob(ctx, client, jobID, timeout)
if err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
Expand All @@ -41,7 +41,7 @@ is safe to retry operations.`)
}

if apply && result.Revision != "" {
if err := awaitSync(ctx, client, result.Revision); err != nil {
if err := awaitSync(ctx, client, result.Revision, timeout); err != nil {
if err == ErrTimeout {
fmt.Fprintln(stderr, `
The operation succeeded, but we timed out waiting for the commit to be
Expand All @@ -61,9 +61,9 @@ to run a sync interactively.`)
}

// await polls for a job to have been completed, with exponential backoff.
func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result, error) {
func awaitJob(ctx context.Context, client api.Server, jobID job.ID, timeout int) (job.Result, error) {
var result job.Result
err := backoff(100*time.Millisecond, 2, 50, 1*time.Minute, func() (bool, error) {
err := backoff(100*time.Millisecond, 2, 50, time.Duration(timeout)*time.Second, func() (bool, error) {
j, err := client.JobStatus(ctx, jobID)
if err != nil {
return false, err
Expand All @@ -86,8 +86,8 @@ func awaitJob(ctx context.Context, client api.Server, jobID job.ID) (job.Result,
}

// await polls for a commit to have been applied, with exponential backoff.
func awaitSync(ctx context.Context, client api.Server, revision string) error {
return backoff(1*time.Second, 2, 10, 1*time.Minute, func() (bool, error) {
func awaitSync(ctx context.Context, client api.Server, revision string, timeout int) error {
return backoff(1*time.Second, 2, 10, time.Duration(timeout)*time.Second, func() (bool, error) {
refs, err := client.SyncStatus(ctx, revision)
return err == nil && len(refs) == 0, err
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/fluxctl/policy_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (opts *workloadPolicyOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity)
return await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, false, opts.verbosity, opts.Timeout)
}

func calculatePolicyChanges(opts *workloadPolicyOpts) (policy.Update, error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/release_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
return err
}

result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if err != nil {
return err
}
Expand All @@ -188,7 +188,7 @@ func (opts *workloadReleaseOpts) RunE(cmd *cobra.Command, args []string) error {
opts.dryRun = false
}

err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity)
err = await(ctx, cmd.OutOrStdout(), cmd.OutOrStderr(), opts.API, jobID, !opts.dryRun, opts.verbosity, opts.Timeout)
if !opts.watch || err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/fluxctl/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type rootOpts struct {
Namespace string
Labels map[string]string
API api.Server
Timeout int
}

func newRoot() *rootOpts {
Expand Down Expand Up @@ -50,12 +51,13 @@ Workflow:
`)

const (
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableURL = "FLUX_URL"
envVariableNamespace = "FLUX_FORWARD_NAMESPACE"
envVariableLabels = "FLUX_FORWARD_LABELS"
envVariableToken = "FLUX_SERVICE_TOKEN"
envVariableCloudToken = "WEAVE_CLOUD_TOKEN"
defaultURLGivenToken = "https://cloud.weave.works/api/flux"
envVariableTimeout = "FLUX_TIMEOUT"
)

func (opts *rootOpts) Command() *cobra.Command {
Expand All @@ -75,6 +77,8 @@ func (opts *rootOpts) Command() *cobra.Command {
fmt.Sprintf("Base URL of the Flux API (defaults to %q if a token is provided); you can also set the environment variable %s", defaultURLGivenToken, envVariableURL))
cmd.PersistentFlags().StringVarP(&opts.Token, "token", "t", "",
fmt.Sprintf("Weave Cloud authentication token; you can also set the environment variable %s or %s", envVariableCloudToken, envVariableToken))
cmd.PersistentFlags().IntVar(&opts.Timeout, "timeout", 60,
fmt.Sprintf("Global command timeout, in seconds; you can also set the environment variable %s", envVariableTimeout))

cmd.AddCommand(
newVersionCommand(),
Expand Down Expand Up @@ -105,6 +109,7 @@ func (opts *rootOpts) PersistentPreRunE(cmd *cobra.Command, _ []string) error {
setFromEnvIfNotSet(cmd.Flags(), "k8s-fwd-labels", envVariableLabels)
setFromEnvIfNotSet(cmd.Flags(), "token", envVariableToken, envVariableCloudToken)
setFromEnvIfNotSet(cmd.Flags(), "url", envVariableURL)
setFromEnvIfNotSet(cmd.Flags(), "timeout", envVariableTimeout)

if opts.Token != "" && opts.URL == "" {
opts.URL = defaultURLGivenToken
Expand Down
4 changes: 2 additions & 2 deletions cmd/fluxctl/sync_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
result, err := awaitJob(ctx, opts.API, jobID)
result, err := awaitJob(ctx, opts.API, jobID, opts.Timeout)
if isUnverifiedHead(err) {
fmt.Fprintf(cmd.OutOrStderr(), "Warning: %s\n", err)
} else if err != nil {
Expand All @@ -70,7 +70,7 @@ func (opts *syncOpts) RunE(cmd *cobra.Command, args []string) error {
rev := result.Revision[:7]
fmt.Fprintf(cmd.OutOrStderr(), "Revision of %s to apply is %s\n", gitConfig.Remote.Branch, rev)
fmt.Fprintf(cmd.OutOrStderr(), "Waiting for %s to be applied ...\n", rev)
err = awaitSync(ctx, opts.API, rev)
err = awaitSync(ctx, opts.API, rev, opts.Timeout)
if err != nil {
return err
}
Expand Down

0 comments on commit d224c25

Please sign in to comment.