Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry policy and process terminate behavior #24

Merged
merged 3 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func stop(cfg *config.Config) error {
log.Printf("DAG is not running.")
return nil
}
syscall.Kill(int(status.Pid), syscall.SIGINT)
syscall.Kill(int(status.Pid), syscall.SIGTERM)
for {
time.Sleep(time.Second * 3)
s, err := controller.New(cfg).GetStatus()
Expand Down
2 changes: 1 addition & 1 deletion internal/admin/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func buildLog(logs []*models.StatusFile) *Log {
}

func getPathParameter(r *http.Request) (string, error) {
re := regexp.MustCompile("/dags/([^/\\?]+)/?$")
re := regexp.MustCompile(`/dags/([^/\?]+)/?$`)
m := re.FindStringSubmatch(r.URL.Path)
if len(m) < 2 {
return "", fmt.Errorf("invalid URL")
Expand Down
10 changes: 5 additions & 5 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (a *Agent) Status() *models.Status {
}

// Signal sends the signal to the processes running
// if processes do not terminate for 60 seconds,
// if processes do not terminate for 120 seconds,
// cancel all processes which will send signal -1 to the processes.
func (a *Agent) Signal(sig os.Signal) {
log.Printf("Sending %s signal to running child processes.", sig)
Expand All @@ -112,7 +112,7 @@ func (a *Agent) Signal(sig os.Signal) {
select {
case <-done:
log.Printf("All child processes have been terminated.")
case <-time.After(time.Second * 60):
case <-time.After(time.Second * 120):
a.Cancel(sig)
default:
log.Printf("Waiting for child processes to exit...")
Expand All @@ -121,7 +121,7 @@ func (a *Agent) Signal(sig os.Signal) {
}

// Cancel sends signal -1 to all child processes.
// then it waits another 20 seconds before therminating the
// then it waits another 60 seconds before therminating the
// parent process.
func (a *Agent) Cancel(sig os.Signal) {
log.Printf("Sending -1 signal to running child processes.")
Expand All @@ -132,7 +132,7 @@ func (a *Agent) Cancel(sig os.Signal) {
select {
case <-done:
log.Printf("All child processes have been terminated.")
case <-time.After(time.Second * 20):
case <-time.After(time.Second * 60):
log.Printf("Terminating the controller process.")
a.Kill(done)
default:
Expand All @@ -154,7 +154,7 @@ func (a *Agent) init() {
&scheduler.Config{
LogDir: path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")),
MaxActiveRuns: a.DAG.MaxActiveRuns,
DelaySec: a.DAG.DelaySec,
Delay: a.DAG.Delay,
Dry: a.Dry,
OnExit: a.DAG.HandlerOn.Exit,
OnSuccess: a.DAG.HandlerOn.Success,
Expand Down
10 changes: 6 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
ErrorMail *MailConfig
InfoMail *MailConfig
Smtp *SmtpConfig
DelaySec time.Duration
Delay time.Duration
HistRetentionDays int
Preconditions []*Condition
MaxActiveRuns int
Expand Down Expand Up @@ -135,7 +135,7 @@ func buildFromDefinition(def *configDefinition, file string, globalConfig *Confi
c.Description = def.Description
c.MailOn.Failure = def.MailOn.Failure
c.MailOn.Success = def.MailOn.Success
c.DelaySec = time.Second * time.Duration(def.DelaySec)
c.Delay = time.Second * time.Duration(def.DelaySec)

if opts != nil && opts.headOnly {
return c, nil
Expand Down Expand Up @@ -308,9 +308,11 @@ func buildStep(variables []string, def *stepDef) (*Step, error) {
Limit: def.RetryPolicy.Limit,
}
}
if def.RepeatPolicy != nil {
step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat
step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec)
}
step.MailOnError = def.MailOnError
step.Repeat = def.Repeat
step.RepeatInterval = time.Second * time.Duration(def.RepeatIntervalSec)
step.Preconditions = loadPreCondition(def.Preconditions)
return step, nil
}
Expand Down
26 changes: 15 additions & 11 deletions internal/config/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,28 @@ type handerOnDef struct {
}

type stepDef struct {
Name string
Description string
Dir string
Command string
Depends []string
ContinueOn *continueOnDef
RetryPolicy *retryPolicyDef
MailOnError bool
Repeat bool
RepeatIntervalSec int
Preconditions []*conditionDef
Name string
Description string
Dir string
Command string
Depends []string
ContinueOn *continueOnDef
RetryPolicy *retryPolicyDef
RepeatPolicy *repeatPolicyDef
MailOnError bool
Preconditions []*conditionDef
}

type continueOnDef struct {
Failure bool
Skipped bool
}

type repeatPolicyDef struct {
Repeat bool
IntervalSec int
}

type retryPolicyDef struct {
Limit int
}
Expand Down
6 changes: 5 additions & 1 deletion internal/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func TestLoadConfig(t *testing.T) {
RetryPolicy: &config.RetryPolicy{
Limit: 2,
},
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Second * 10,
},
},
{
Name: "2",
Expand Down Expand Up @@ -90,7 +94,7 @@ func TestLoadConfig(t *testing.T) {
Failure: true,
Success: true,
},
DelaySec: time.Second * 1,
Delay: time.Second * 1,
MaxActiveRuns: 1,
Params: []string{"param1", "param2"},
DefaultParams: "param1 param2",
Expand Down
30 changes: 17 additions & 13 deletions internal/config/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,29 @@ import (
)

type Step struct {
Name string
Description string
Variables []string
Dir string
Command string
Args []string
Depends []string
ContinueOn ContinueOn
RetryPolicy *RetryPolicy
MailOnError bool
Repeat bool
RepeatInterval time.Duration
Preconditions []*Condition
Name string
Description string
Variables []string
Dir string
Command string
Args []string
Depends []string
ContinueOn ContinueOn
RetryPolicy *RetryPolicy
RepeatPolicy RepeatPolicy
MailOnError bool
Preconditions []*Condition
}

type RetryPolicy struct {
Limit int
}

type RepeatPolicy struct {
Repeat bool
Interval time.Duration
}

type ContinueOn struct {
Failure bool
Skipped bool
Expand Down
4 changes: 2 additions & 2 deletions internal/models/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func TestStatusSerialization(t *testing.T) {
Dir: "dir", Command: "echo 1", Args: []string{},
Depends: []string{}, ContinueOn: config.ContinueOn{},
RetryPolicy: &config.RetryPolicy{}, MailOnError: false,
Repeat: false, RepeatInterval: 0, Preconditions: []*config.Condition{},
RepeatPolicy: config.RepeatPolicy{}, Preconditions: []*config.Condition{},
},
},
MailOn: config.MailOn{},
ErrorMail: &config.MailConfig{},
InfoMail: &config.MailConfig{},
Smtp: &config.SmtpConfig{},
DelaySec: 0,
Delay: 0,
HistRetentionDays: 0,
Preconditions: []*config.Condition{},
MaxActiveRuns: 0,
Expand Down
4 changes: 1 addition & 3 deletions internal/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func (n *Node) Execute() error {
cmd := exec.CommandContext(ctx, n.Command, n.Args...)
n.cmd = cmd
cmd.Dir = n.Dir
for _, v := range n.Variables {
cmd.Env = append(cmd.Env, v)
}
cmd.Env = append(cmd.Env, n.Variables...)

if n.logWriter != nil {
cmd.Stdout = n.logWriter
Expand Down
26 changes: 15 additions & 11 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Scheduler struct {
type Config struct {
LogDir string
MaxActiveRuns int
DelaySec time.Duration
Delay time.Duration
Dry bool
OnExit *config.Step
OnSuccess *config.Step
Expand Down Expand Up @@ -134,18 +134,22 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
// nothing to do
case NodeStatus_Error:
sc.lastError = err
fallthrough
default:
if done != nil {
done <- node
}
}
return
}
if node.Repeat {
if node.ReadStatus() != NodeStatus_Cancel {
node.incDoneCount()
time.Sleep(node.RepeatInterval)
continue
}
if node.RepeatPolicy.Repeat {
if err == nil || node.ContinueOn.Failure {
time.Sleep(node.RepeatPolicy.Interval)
continue
}
}
if err != nil {
if done != nil {
done <- node
}
return
}
break
}
Expand All @@ -155,7 +159,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
}
}(node)

time.Sleep(sc.DelaySec)
time.Sleep(sc.Delay)
}

time.Sleep(sc.pause)
Expand Down
62 changes: 56 additions & 6 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,10 @@ func TestSchedulerRetrySuccess(t *testing.T) {
defer os.Remove(tmpDir)

go func() {
select {
case <-time.After(time.Millisecond * 300):
f, err := os.Create(tmpFile)
require.NoError(t, err)
f.Close()
}
<-time.After(time.Millisecond * 300)
f, err := os.Create(tmpFile)
require.NoError(t, err)
f.Close()
}()

g, sc, err := testSchedule(t,
Expand Down Expand Up @@ -402,6 +400,58 @@ func TestSchedulerOnFailure(t *testing.T) {
assert.Equal(t, sc.HanderNode(constants.OnCancel).ReadStatus(), scheduler.NodeStatus_None)
}

func TestRepeat(t *testing.T) {
g, _ := scheduler.NewExecutionGraph(
&config.Step{
Name: "1",
Command: "sleep",
Args: []string{"1"},
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Millisecond * 300,
},
},
)
sc := scheduler.New(&scheduler.Config{})

done := make(chan bool)
go func() {
<-time.After(time.Millisecond * 3000)
sc.Cancel(g, done)
}()

err := sc.Schedule(g, nil)
<-done // Wait for canceling finished
require.NoError(t, err)

nodes := g.Nodes()

assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Cancel)
assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Cancel)
assert.Equal(t, nodes[0].DoneCount, 2)
}

func TestRepeatFail(t *testing.T) {
g, _ := scheduler.NewExecutionGraph(
&config.Step{
Name: "1",
Command: testCommandFail,
RepeatPolicy: config.RepeatPolicy{
Repeat: true,
Interval: time.Millisecond * 300,
},
},
)
sc := scheduler.New(&scheduler.Config{})
err := sc.Schedule(g, nil)
require.Error(t, err)

nodes := g.Nodes()
assert.Equal(t, sc.Status(g), scheduler.SchedulerStatus_Error)
assert.Equal(t, nodes[0].Status, scheduler.NodeStatus_Error)
assert.Equal(t, nodes[0].DoneCount, 1)
}

func testSchedule(t *testing.T, steps ...*config.Step) (
*scheduler.ExecutionGraph, *scheduler.Scheduler, error,
) {
Expand Down
3 changes: 3 additions & 0 deletions tests/testdata/config_load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ steps:
skipped: true
retryPolicy:
limit: 2
repeatPolicy:
repeat: true
intervalSec: 10
preconditions:
- condition: "`echo test`"
expected: test
Expand Down