Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use server's timestamp.ParseDuration for all durations #572

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ type TemporalOperatorNamespaceCreateCommand struct {
Global bool
HistoryArchivalState StringEnum
HistoryUri string
Retention time.Duration
Retention Duration
VisibilityArchivalState StringEnum
VisibilityUri string
}
Expand All @@ -650,7 +650,8 @@ func NewTemporalOperatorNamespaceCreateCommand(cctx *CommandContext, parent *Tem
s.HistoryArchivalState = NewStringEnum([]string{"disabled", "enabled"}, "disabled")
s.Command.Flags().Var(&s.HistoryArchivalState, "history-archival-state", "History archival state. Accepted values: disabled, enabled.")
s.Command.Flags().StringVar(&s.HistoryUri, "history-uri", "", "Optionally specify history archival URI (cannot be changed after first time archival is enabled).")
s.Command.Flags().DurationVar(&s.Retention, "retention", 259200000*time.Millisecond, "Length of time a closed Workflow is preserved before deletion.")
s.Retention = Duration(259200000 * time.Millisecond)
s.Command.Flags().Var(&s.Retention, "retention", "Length of time a closed Workflow is preserved before deletion.")
s.VisibilityArchivalState = NewStringEnum([]string{"disabled", "enabled"}, "disabled")
s.Command.Flags().Var(&s.VisibilityArchivalState, "visibility-archival-state", "Visibility archival state. Accepted values: disabled, enabled.")
s.Command.Flags().StringVar(&s.VisibilityUri, "visibility-uri", "", "Optionally specify visibility archival URI (cannot be changed after first time archival is enabled).")
Expand Down Expand Up @@ -744,7 +745,7 @@ type TemporalOperatorNamespaceUpdateCommand struct {
PromoteGlobal bool
HistoryArchivalState StringEnum
HistoryUri string
Retention time.Duration
Retention Duration
VisibilityArchivalState StringEnum
VisibilityUri string
}
Expand All @@ -770,7 +771,8 @@ func NewTemporalOperatorNamespaceUpdateCommand(cctx *CommandContext, parent *Tem
s.HistoryArchivalState = NewStringEnum([]string{"disabled", "enabled"}, "")
s.Command.Flags().Var(&s.HistoryArchivalState, "history-archival-state", "History archival state. Accepted values: disabled, enabled.")
s.Command.Flags().StringVar(&s.HistoryUri, "history-uri", "", "Optionally specify history archival URI (cannot be changed after first time archival is enabled).")
s.Command.Flags().DurationVar(&s.Retention, "retention", 0, "Length of time a closed Workflow is preserved before deletion.")
s.Retention = 0
s.Command.Flags().Var(&s.Retention, "retention", "Length of time a closed Workflow is preserved before deletion.")
s.VisibilityArchivalState = NewStringEnum([]string{"disabled", "enabled"}, "")
s.Command.Flags().Var(&s.VisibilityArchivalState, "visibility-archival-state", "Visibility archival state. Accepted values: disabled, enabled.")
s.Command.Flags().StringVar(&s.VisibilityUri, "visibility-uri", "", "Optionally specify visibility archival URI (cannot be changed after first time archival is enabled).")
Expand Down Expand Up @@ -966,11 +968,11 @@ func NewTemporalScheduleBackfillCommand(cctx *CommandContext, parent *TemporalSc

type ScheduleConfigurationOptions struct {
Calendar []string
CatchupWindow time.Duration
CatchupWindow Duration
Cron []string
EndTime Timestamp
Interval []string
Jitter time.Duration
Jitter Duration
Notes string
Paused bool
PauseOnFailure bool
Expand All @@ -983,11 +985,13 @@ type ScheduleConfigurationOptions struct {

func (v *ScheduleConfigurationOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringArrayVar(&v.Calendar, "calendar", nil, "Calendar specification in JSON, e.g. `{\"dayOfWeek\":\"Fri\",\"hour\":\"17\",\"minute\":\"5\"}`.")
f.DurationVar(&v.CatchupWindow, "catchup-window", 0, "Maximum allowed catch-up time if server is down.")
v.CatchupWindow = 0
f.Var(&v.CatchupWindow, "catchup-window", "Maximum allowed catch-up time if server is down.")
f.StringArrayVar(&v.Cron, "cron", nil, "Calendar spec in cron string format, e.g. `3 11 * * Fri`.")
f.Var(&v.EndTime, "end-time", "Overall schedule end time.")
f.StringArrayVar(&v.Interval, "interval", nil, "Interval duration, e.g. 90m, or 90m/13m to include phase offset.")
f.DurationVar(&v.Jitter, "jitter", 0, "Per-action jitter range.")
v.Jitter = 0
f.Var(&v.Jitter, "jitter", "Per-action jitter range.")
f.StringVar(&v.Notes, "notes", "", "Initial value of notes field.")
f.BoolVar(&v.Paused, "paused", false, "Initial value of paused state.")
f.BoolVar(&v.PauseOnFailure, "pause-on-failure", false, "Pause schedule after any workflow failure.")
Expand Down Expand Up @@ -2045,9 +2049,9 @@ type SharedWorkflowStartOptions struct {
WorkflowId string
Type string
TaskQueue string
RunTimeout time.Duration
ExecutionTimeout time.Duration
TaskTimeout time.Duration
RunTimeout Duration
ExecutionTimeout Duration
TaskTimeout Duration
SearchAttribute []string
Memo []string
}
Expand All @@ -2058,24 +2062,28 @@ func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.F
_ = cobra.MarkFlagRequired(f, "type")
f.StringVarP(&v.TaskQueue, "task-queue", "t", "", "Workflow Task queue.")
_ = cobra.MarkFlagRequired(f, "task-queue")
f.DurationVar(&v.RunTimeout, "run-timeout", 0, "Timeout of a Workflow Run.")
f.DurationVar(&v.ExecutionTimeout, "execution-timeout", 0, "Timeout for a WorkflowExecution, including retries and ContinueAsNew tasks.")
f.DurationVar(&v.TaskTimeout, "task-timeout", 10000*time.Millisecond, "Start-to-close timeout for a Workflow Task.")
v.RunTimeout = 0
f.Var(&v.RunTimeout, "run-timeout", "Timeout of a Workflow Run.")
v.ExecutionTimeout = 0
f.Var(&v.ExecutionTimeout, "execution-timeout", "Timeout for a WorkflowExecution, including retries and ContinueAsNew tasks.")
v.TaskTimeout = Duration(10000 * time.Millisecond)
f.Var(&v.TaskTimeout, "task-timeout", "Start-to-close timeout for a Workflow Task.")
f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Passes Search Attribute in key=value format. Use valid JSON formats for value.")
f.StringArrayVar(&v.Memo, "memo", nil, "Passes Memo in key=value format. Use valid JSON formats for value.")
}

type WorkflowStartOptions struct {
Cron string
FailExisting bool
StartDelay time.Duration
StartDelay Duration
IdReusePolicy string
}

func (v *WorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVar(&v.Cron, "cron", "", "Cron schedule for the workflow. Deprecated - use schedules instead.")
f.BoolVar(&v.FailExisting, "fail-existing", false, "Fail if the workflow already exists.")
f.DurationVar(&v.StartDelay, "start-delay", 0, "Specify a delay before the workflow starts. Cannot be used with a cron schedule. If the workflow receives a signal or update before the delay has elapsed, it will begin immediately.")
v.StartDelay = 0
f.Var(&v.StartDelay, "start-delay", "Specify a delay before the workflow starts. Cannot be used with a cron schedule. If the workflow receives a signal or update before the delay has elapsed, it will begin immediately.")
f.StringVar(&v.IdReusePolicy, "id-reuse-policy", "", "Allows the same Workflow Id to be used in a new Workflow Execution. Accepted values: AllowDuplicate, AllowDuplicateFailedOnly, RejectDuplicate, TerminateIfRunning.")
}

Expand Down
4 changes: 2 additions & 2 deletions temporalcli/commands.operator_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *TemporalOperatorNamespaceCreateCommand) run(cctx *CommandContext, args
Namespace: nsName,
Description: c.Description,
OwnerEmail: c.Email,
WorkflowExecutionRetentionPeriod: durationpb.New(c.Retention),
WorkflowExecutionRetentionPeriod: durationpb.New(c.Retention.Duration()),
Clusters: clusters,
ActiveClusterName: c.ActiveCluster,
Data: data,
Expand Down Expand Up @@ -255,7 +255,7 @@ func (c *TemporalOperatorNamespaceUpdateCommand) run(cctx *CommandContext, args
}

if c.Retention > 0 {
retention = durationpb.New(c.Retention)
retention = durationpb.New(c.Retention.Duration())
}

var clusters []*replication.ClusterReplicationConfig
Expand Down
2 changes: 1 addition & 1 deletion temporalcli/commands.operator_namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *SharedServerSuite) TestNamespaceUpdate() {
"--address", s.Address(),
"--description", "description after",
"--email", "email@after",
"--retention", "48h",
"--retention", "2d",
"--data", "k1=v1",
"--data", "k2=v2",
"--output", "json",
Expand Down
6 changes: 3 additions & 3 deletions temporalcli/commands.schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func toIntervalSpec(str string) (client.ScheduleIntervalSpec, error) {
func (c *ScheduleConfigurationOptions) toScheduleSpec(spec *client.ScheduleSpec) error {
spec.CronExpressions = c.Cron
// Skip not supported
spec.Jitter = c.Jitter
spec.Jitter = c.Jitter.Duration()
spec.TimeZoneName = c.TimeZone
spec.StartAt = c.StartTime.Time()
spec.EndAt = c.EndTime.Time()
Expand Down Expand Up @@ -289,7 +289,7 @@ func (c *TemporalScheduleCreateCommand) run(cctx *CommandContext, args []string)
PauseOnFailure: c.PauseOnFailure,
Note: c.Notes,
Paused: c.Paused,
CatchupWindow: c.CatchupWindow,
CatchupWindow: c.CatchupWindow.Duration(),
RemainingActions: c.RemainingActions,
// TriggerImmediately not supported
// ScheduleBackfill not supported
Expand Down Expand Up @@ -517,7 +517,7 @@ func (c *TemporalScheduleUpdateCommand) run(cctx *CommandContext, args []string)
newSchedule := client.Schedule{
Spec: &client.ScheduleSpec{},
Policy: &client.SchedulePolicies{
CatchupWindow: c.CatchupWindow,
CatchupWindow: c.CatchupWindow.Duration(),
PauseOnFailure: c.PauseOnFailure,
},
State: &client.ScheduleState{
Expand Down
8 changes: 4 additions & 4 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,12 @@ func buildStartOptions(sw *SharedWorkflowStartOptions, w *WorkflowStartOptions)
o := client.StartWorkflowOptions{
ID: sw.WorkflowId,
TaskQueue: sw.TaskQueue,
WorkflowRunTimeout: sw.RunTimeout,
WorkflowExecutionTimeout: sw.ExecutionTimeout,
WorkflowTaskTimeout: sw.TaskTimeout,
WorkflowRunTimeout: sw.RunTimeout.Duration(),
WorkflowExecutionTimeout: sw.ExecutionTimeout.Duration(),
WorkflowTaskTimeout: sw.TaskTimeout.Duration(),
CronSchedule: w.Cron,
WorkflowExecutionErrorWhenAlreadyStarted: w.FailExisting,
StartDelay: w.StartDelay,
StartDelay: w.StartDelay.Duration(),
}
if w.IdReusePolicy != "" {
var err error
Expand Down
17 changes: 11 additions & 6 deletions temporalcli/commandsmd/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"regexp"
"sort"
"strings"
"time"

"go.temporal.io/server/common/primitives/timestamp"
)

func GenerateCommandsCode(pkg string, commands []*Command) ([]byte, error) {
Expand Down Expand Up @@ -264,7 +265,7 @@ func (c *CommandOption) writeStructField(w *codeWriter) error {
case "bool", "int", "string":
goDataType = c.DataType
case "duration":
goDataType = w.importPkg("time") + ".Duration"
goDataType = "Duration"
case "timestamp":
goDataType = "Timestamp"
case "string[]":
Expand All @@ -279,22 +280,22 @@ func (c *CommandOption) writeStructField(w *codeWriter) error {
}

func (c *CommandOption) writeFlagBuilding(selfVar, flagVar string, w *codeWriter) error {
var flagMeth, defaultLit string
var flagMeth, defaultLit, setDefault string
switch c.DataType {
case "bool":
flagMeth, defaultLit = "BoolVar", ", false"
if c.DefaultValue != "" {
return fmt.Errorf("cannot have default for bool var")
}
case "duration":
flagMeth, defaultLit = "DurationVar", ", 0"
flagMeth, setDefault = "Var", "0"
if c.DefaultValue != "" {
dur, err := time.ParseDuration(c.DefaultValue)
dur, err := timestamp.ParseDuration(c.DefaultValue)
if err != nil {
return fmt.Errorf("invalid default: %w", err)
}
// We round to the nearest ms
defaultLit = fmt.Sprintf(", %v * %v.Millisecond", int64(dur/time.Millisecond), w.importPkg("time"))
setDefault = fmt.Sprintf("Duration(%v * %v.Millisecond)", dur.Milliseconds(), w.importPkg("time"))
}
case "timestamp":
if c.DefaultValue != "" {
Expand Down Expand Up @@ -335,6 +336,10 @@ func (c *CommandOption) writeFlagBuilding(selfVar, flagVar string, w *codeWriter
desc += fmt.Sprintf(" Accepted values: %s.", strings.Join(c.EnumValues, ", "))
}

if setDefault != "" {
// set default before calling Var so that it stores thedefault value into the flag
w.writeLinef("%v.%v = %v", selfVar, c.fieldName(), setDefault)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cretz If this isn't the right way to do defaults for Var flags, let me know

}
if c.Alias == "" {
w.writeLinef("%v.%v(&%v.%v, %q%v, %q)",
flagVar, flagMeth, selfVar, c.fieldName(), c.Name, defaultLit, desc)
Expand Down
30 changes: 30 additions & 0 deletions temporalcli/duration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package temporalcli

import (
"time"

"go.temporal.io/server/common/primitives/timestamp"
)

type Duration time.Duration

func (d Duration) Duration() time.Duration {
return time.Duration(d)
}

func (d *Duration) String() string {
return d.Duration().String()
}

func (d *Duration) Set(s string) error {
p, err := timestamp.ParseDuration(s)
if err != nil {
return err
}
*d = Duration(p)
return nil
}

func (d *Duration) Type() string {
return "duration"
}
Loading