From e1caf0f4e141f7211d23a582523c1fe5003d1e48 Mon Sep 17 00:00:00 2001 From: yohamta Date: Mon, 22 Aug 2022 15:11:08 +0900 Subject: [PATCH 1/4] internal/dag: add signalOnStop --- agent.go | 42 +++++++++++++++------------- internal/dag/dag.go | 9 ++++++ internal/dag/definition.go | 1 + internal/dag/step.go | 1 + internal/scheduler/node.go | 11 ++++++-- internal/scheduler/node_test.go | 23 ++++++++++++++- internal/scheduler/scheduler.go | 4 +-- internal/scheduler/scheduler_test.go | 6 ++-- 8 files changed, 69 insertions(+), 28 deletions(-) diff --git a/agent.go b/agent.go index f21ab9004..e9b2c568e 100644 --- a/agent.go +++ b/agent.go @@ -112,10 +112,30 @@ func (a *Agent) Status() *models.Status { // Signal sends the signal to the processes running // if processes do not terminate after MaxCleanUp time, it will send KILL signal. func (a *Agent) Signal(sig os.Signal) { + a.signal(sig, false) +} + +// Kill sends KILL signal to all child processes. +func (a *Agent) Kill() { + log.Printf("Sending KILL signal to running child processes.") + a.scheduler.Signal(a.graph, syscall.SIGKILL, nil, false) +} + +// Cancel sends signal -1 to all child processes. +func (a *Agent) Cancel() { + log.Printf("Sending -1 signal to running child processes.") + a.scheduler.Cancel(a.graph) + for a.scheduler.Status(a.graph) == scheduler.SchedulerStatus_Running { + time.Sleep(time.Second * 5) + a.scheduler.Cancel(a.graph) + } +} + +func (a *Agent) signal(sig os.Signal, allowOverride bool) { log.Printf("Sending %s signal to running child processes.", sig) done := make(chan bool) go func() { - a.scheduler.Signal(a.graph, sig, done) + a.scheduler.Signal(a.graph, sig, done, false) }() timeout := time.After(a.DAG.MaxCleanUpTime) tick := time.After(time.Second * 5) @@ -130,7 +150,7 @@ func (a *Agent) Signal(sig os.Signal) { return case <-tick: log.Printf("Sending signal again") - a.scheduler.Signal(a.graph, sig, nil) + a.scheduler.Signal(a.graph, sig, nil, false) tick = time.After(time.Second * 5) default: log.Printf("Waiting for child processes to exit...") @@ -139,22 +159,6 @@ func (a *Agent) Signal(sig os.Signal) { } } -// Kill sends KILL signal to all child processes. -func (a *Agent) Kill() { - log.Printf("Sending KILL signal to running child processes.") - a.scheduler.Signal(a.graph, syscall.SIGKILL, nil) -} - -// Cancel sends signal -1 to all child processes. -func (a *Agent) Cancel() { - log.Printf("Sending -1 signal to running child processes.") - a.scheduler.Cancel(a.graph) - for a.scheduler.Status(a.graph) == scheduler.SchedulerStatus_Running { - time.Sleep(time.Second * 5) - a.scheduler.Cancel(a.graph) - } -} - func (a *Agent) init() { logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_")) a.scheduler = &scheduler.Scheduler{ @@ -387,7 +391,7 @@ func (a *Agent) handleHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) go func() { - a.Signal(syscall.SIGTERM) + a.signal(syscall.SIGTERM, true) }() default: encodeError(w, errNotFound) diff --git a/internal/dag/dag.go b/internal/dag/dag.go index 47eeae002..350c0c355 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -14,6 +14,7 @@ import ( "github.com/yohamta/dagu/internal/constants" "github.com/yohamta/dagu/internal/settings" "github.com/yohamta/dagu/internal/utils" + "golang.org/x/sys/unix" ) // DAG represents a DAG configuration. @@ -476,6 +477,14 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) { step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec) } + if def.SignalOnStep != nil { + sigDef := *def.SignalOnStep + sig := unix.SignalNum(sigDef) + if sig == 0 { + return nil, fmt.Errorf("invalid signal: %s", sigDef) + } + step.SignalOnStep = sigDef + } step.MailOnError = def.MailOnError step.Preconditions = loadPreCondition(def.Preconditions) return step, nil diff --git a/internal/dag/definition.go b/internal/dag/definition.go index 3101d8023..e398f446d 100644 --- a/internal/dag/definition.go +++ b/internal/dag/definition.go @@ -49,6 +49,7 @@ type stepDef struct { RepeatPolicy *repeatPolicyDef MailOnError bool Preconditions []*conditionDef + SignalOnStep *string } type continueOnDef struct { diff --git a/internal/dag/step.go b/internal/dag/step.go index 474b00379..d5f444cd1 100644 --- a/internal/dag/step.go +++ b/internal/dag/step.go @@ -27,6 +27,7 @@ type Step struct { RepeatPolicy RepeatPolicy MailOnError bool Preconditions []*Condition + SignalOnStep string } type RetryPolicy struct { diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index 15e6b3fcc..703a37a57 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -16,6 +16,7 @@ import ( "github.com/yohamta/dagu/internal/dag" "github.com/yohamta/dagu/internal/executor" "github.com/yohamta/dagu/internal/utils" + "golang.org/x/sys/unix" ) type NodeStatus int @@ -177,13 +178,17 @@ func (n *Node) updateStatus(status NodeStatus) { n.Status = status } -func (n *Node) signal(sig os.Signal) { +func (n *Node) signal(sig os.Signal, allowOverride bool) { n.mu.Lock() defer n.mu.Unlock() status := n.Status if status == NodeStatus_Running && n.cmd != nil { - log.Printf("Sending %s signal to %s", sig, n.Name) - utils.LogErr("sending signal", n.cmd.Kill(sig)) + sigsig := sig + if allowOverride && n.Step.SignalOnStep != "" { + sigsig = unix.SignalNum(n.Step.SignalOnStep) + } + log.Printf("Sending %s signal to %s", sigsig, n.Name) + utils.LogErr("sending signal", n.cmd.Kill(sigsig)) } if status == NodeStatus_Running { n.Status = NodeStatus_Cancel diff --git a/internal/scheduler/node_test.go b/internal/scheduler/node_test.go index dd92385c0..9101dc3bb 100644 --- a/internal/scheduler/node_test.go +++ b/internal/scheduler/node_test.go @@ -45,7 +45,28 @@ func TestSignal(t *testing.T) { go func() { time.Sleep(100 * time.Millisecond) - n.signal(syscall.SIGTERM) + n.signal(syscall.SIGTERM, false) + }() + + n.updateStatus(NodeStatus_Running) + err := n.Execute() + + require.Error(t, err) + require.Equal(t, n.Status, NodeStatus_Cancel) +} + +func TestSignalSpecified(t *testing.T) { + n := &Node{ + Step: &dag.Step{ + Command: "sleep", + Args: []string{"100"}, + OutputVariables: &sync.Map{}, + SignalOnStep: "SIGINT", + }} + + go func() { + time.Sleep(100 * time.Millisecond) + n.signal(syscall.SIGTERM, true) }() n.updateStatus(NodeStatus_Running) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 8cc16a462..b7c86d4da 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -211,7 +211,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error { // Signal sends a signal to the scheduler. // for a node with repeat policy, it does not stop the node and // wait to finish current run. -func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool) { +func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool, allowOverride bool) { if !sc.IsCanceled() { sc.setCanceled() } @@ -220,7 +220,7 @@ func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool) { // for a repetitive task, we'll wait for the job to finish // until time reaches max wait time } else { - node.signal(sig) + node.signal(sig, allowOverride) } } if done != nil { diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 5d8302b5b..8e151ff21 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -360,7 +360,7 @@ func TestSchedulerOnSignal(t *testing.T) { go func() { <-time.After(time.Millisecond * 50) - sc.Signal(g, syscall.SIGTERM, nil) + sc.Signal(g, syscall.SIGTERM, nil, false) }() err := sc.Schedule(g, nil) @@ -386,7 +386,7 @@ func TestSchedulerOnCancel(t *testing.T) { done := make(chan bool) go func() { <-time.After(time.Millisecond * 500) - sc.Signal(g, syscall.SIGTERM, done) + sc.Signal(g, syscall.SIGTERM, done, false) }() err := sc.Schedule(g, nil) @@ -511,7 +511,7 @@ func TestStopRepetitiveTaskGracefully(t *testing.T) { done := make(chan bool) go func() { <-time.After(time.Millisecond * 100) - sc.Signal(g, syscall.SIGTERM, done) + sc.Signal(g, syscall.SIGTERM, done, false) }() err := sc.Schedule(g, nil) From f672ab06a1125db5db5f2110b52063b4bc3c3761 Mon Sep 17 00:00:00 2001 From: yohamta Date: Mon, 22 Aug 2022 15:22:42 +0900 Subject: [PATCH 2/4] Update README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 3036f2e71..216a7b710 100644 --- a/README.md +++ b/README.md @@ -408,6 +408,7 @@ steps: ouptut: RESULT_VARIABLE script: | echo "any script" + signalOnStop: "SIGINT" # Specify signal name (e.g. SIGINT) to be sent when process is stopped mailOn: failure: true # Send a mail when the step failed success: true # Send a mail when the step finished From 34ee99564129ab7dfa4759a96f24d478455f0cd0 Mon Sep 17 00:00:00 2001 From: yohamta Date: Mon, 22 Aug 2022 15:34:02 +0900 Subject: [PATCH 3/4] agent: remove unused code --- agent.go | 10 ---------- agent_test.go | 1 - 2 files changed, 11 deletions(-) diff --git a/agent.go b/agent.go index e9b2c568e..663f8b0ad 100644 --- a/agent.go +++ b/agent.go @@ -121,16 +121,6 @@ func (a *Agent) Kill() { a.scheduler.Signal(a.graph, syscall.SIGKILL, nil, false) } -// Cancel sends signal -1 to all child processes. -func (a *Agent) Cancel() { - log.Printf("Sending -1 signal to running child processes.") - a.scheduler.Cancel(a.graph) - for a.scheduler.Status(a.graph) == scheduler.SchedulerStatus_Running { - time.Sleep(time.Second * 5) - a.scheduler.Cancel(a.graph) - } -} - func (a *Agent) signal(sig os.Signal, allowOverride bool) { log.Printf("Sending %s signal to running child processes.", sig) done := make(chan bool) diff --git a/agent_test.go b/agent_test.go index a552a16cf..d8f1b7944 100644 --- a/agent_test.go +++ b/agent_test.go @@ -94,7 +94,6 @@ func TestDryRun(t *testing.T) { func TestCancelDAG(t *testing.T) { for _, abort := range []func(*Agent){ func(a *Agent) { a.Signal(syscall.SIGTERM) }, - func(a *Agent) { a.Cancel() }, } { a, d := testDAGAsync(t, "agent_sleep.yaml") time.Sleep(time.Millisecond * 100) From 918ab138976bedaad3d04f64344065ab8e2320e9 Mon Sep 17 00:00:00 2001 From: yohamta Date: Mon, 22 Aug 2022 15:46:01 +0900 Subject: [PATCH 4/4] internal/dag: fix bug --- internal/dag/dag.go | 6 +++--- internal/dag/definition.go | 2 +- internal/dag/loader_test.go | 36 +++++++++++++++++++++++++++++++++ internal/dag/step.go | 2 +- internal/scheduler/node.go | 4 ++-- internal/scheduler/node_test.go | 2 +- 6 files changed, 44 insertions(+), 8 deletions(-) diff --git a/internal/dag/dag.go b/internal/dag/dag.go index 350c0c355..1ab2f4218 100644 --- a/internal/dag/dag.go +++ b/internal/dag/dag.go @@ -477,13 +477,13 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) { step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec) } - if def.SignalOnStep != nil { - sigDef := *def.SignalOnStep + if def.SignalOnStop != nil { + sigDef := *def.SignalOnStop sig := unix.SignalNum(sigDef) if sig == 0 { return nil, fmt.Errorf("invalid signal: %s", sigDef) } - step.SignalOnStep = sigDef + step.SignalOnStop = sigDef } step.MailOnError = def.MailOnError step.Preconditions = loadPreCondition(def.Preconditions) diff --git a/internal/dag/definition.go b/internal/dag/definition.go index e398f446d..65346900b 100644 --- a/internal/dag/definition.go +++ b/internal/dag/definition.go @@ -49,7 +49,7 @@ type stepDef struct { RepeatPolicy *repeatPolicyDef MailOnError bool Preconditions []*conditionDef - SignalOnStep *string + SignalOnStop *string } type continueOnDef struct { diff --git a/internal/dag/loader_test.go b/internal/dag/loader_test.go index 8f452d950..07673f5a9 100644 --- a/internal/dag/loader_test.go +++ b/internal/dag/loader_test.go @@ -1,6 +1,7 @@ package dag import ( + "fmt" "path" "testing" "time" @@ -81,6 +82,41 @@ steps: require.Error(t, err) } +func TestLoadSignalOnStop(t *testing.T) { + for _, tc := range []struct { + sig string + want string + err bool + }{ + { + sig: "SIGINT", + want: "SIGINT", + err: false, + }, + { + sig: "2000", + err: true, + }, + } { + dat := fmt.Sprintf(`name: test DAG +steps: + - name: "1" + command: "true" + signalOnStop: "%s" +`, tc.sig) + l := &Loader{} + ret, err := l.LoadData([]byte(dat)) + if tc.err { + require.Error(t, err) + continue + } + require.NoError(t, err) + + step := ret.Steps[0] + require.Equal(t, step.SignalOnStop, tc.want) + } +} + func TestLoadErrorFileNotExist(t *testing.T) { l := &Loader{} _, err := l.Load(path.Join(testDir, "not_existing_file.yaml"), "") diff --git a/internal/dag/step.go b/internal/dag/step.go index d5f444cd1..bd78e4bab 100644 --- a/internal/dag/step.go +++ b/internal/dag/step.go @@ -27,7 +27,7 @@ type Step struct { RepeatPolicy RepeatPolicy MailOnError bool Preconditions []*Condition - SignalOnStep string + SignalOnStop string } type RetryPolicy struct { diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index 703a37a57..4623d2f0a 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -184,8 +184,8 @@ func (n *Node) signal(sig os.Signal, allowOverride bool) { status := n.Status if status == NodeStatus_Running && n.cmd != nil { sigsig := sig - if allowOverride && n.Step.SignalOnStep != "" { - sigsig = unix.SignalNum(n.Step.SignalOnStep) + if allowOverride && n.Step.SignalOnStop != "" { + sigsig = unix.SignalNum(n.Step.SignalOnStop) } log.Printf("Sending %s signal to %s", sigsig, n.Name) utils.LogErr("sending signal", n.cmd.Kill(sigsig)) diff --git a/internal/scheduler/node_test.go b/internal/scheduler/node_test.go index 9101dc3bb..5060623c0 100644 --- a/internal/scheduler/node_test.go +++ b/internal/scheduler/node_test.go @@ -61,7 +61,7 @@ func TestSignalSpecified(t *testing.T) { Command: "sleep", Args: []string{"100"}, OutputVariables: &sync.Map{}, - SignalOnStep: "SIGINT", + SignalOnStop: "SIGINT", }} go func() {