Skip to content

Commit

Permalink
Merge pull request #289 from yohamta/feat/signal-on-step
Browse files Browse the repository at this point in the history
internal/dag: add signalOnStop
  • Loading branch information
yottahmd authored Aug 22, 2022
2 parents 34e7d28 + 918ab13 commit 8b34bc0
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 29 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ steps:
ouptut: RESULT_VARIABLE
script: |
echo "any script"
signalOnStop: "SIGINT" # Specify signal name (e.g. SIGINT) to be sent when process is stopped
mailOn:
failure: true # Send a mail when the step failed
success: true # Send a mail when the step finished
Expand Down
32 changes: 13 additions & 19 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,20 @@ func (a *Agent) Status() *models.Status {
// Signal sends the signal to the processes running
// if processes do not terminate after MaxCleanUp time, it will send KILL signal.
func (a *Agent) Signal(sig os.Signal) {
a.signal(sig, false)
}

// Kill sends KILL signal to all child processes.
func (a *Agent) Kill() {
log.Printf("Sending KILL signal to running child processes.")
a.scheduler.Signal(a.graph, syscall.SIGKILL, nil, false)
}

func (a *Agent) signal(sig os.Signal, allowOverride bool) {
log.Printf("Sending %s signal to running child processes.", sig)
done := make(chan bool)
go func() {
a.scheduler.Signal(a.graph, sig, done)
a.scheduler.Signal(a.graph, sig, done, false)
}()
timeout := time.After(a.DAG.MaxCleanUpTime)
tick := time.After(time.Second * 5)
Expand All @@ -130,7 +140,7 @@ func (a *Agent) Signal(sig os.Signal) {
return
case <-tick:
log.Printf("Sending signal again")
a.scheduler.Signal(a.graph, sig, nil)
a.scheduler.Signal(a.graph, sig, nil, false)
tick = time.After(time.Second * 5)
default:
log.Printf("Waiting for child processes to exit...")
Expand All @@ -139,22 +149,6 @@ func (a *Agent) Signal(sig os.Signal) {
}
}

// Kill sends KILL signal to all child processes.
func (a *Agent) Kill() {
log.Printf("Sending KILL signal to running child processes.")
a.scheduler.Signal(a.graph, syscall.SIGKILL, nil)
}

// Cancel sends signal -1 to all child processes.
func (a *Agent) Cancel() {
log.Printf("Sending -1 signal to running child processes.")
a.scheduler.Cancel(a.graph)
for a.scheduler.Status(a.graph) == scheduler.SchedulerStatus_Running {
time.Sleep(time.Second * 5)
a.scheduler.Cancel(a.graph)
}
}

func (a *Agent) init() {
logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_"))
a.scheduler = &scheduler.Scheduler{
Expand Down Expand Up @@ -387,7 +381,7 @@ func (a *Agent) handleHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
go func() {
a.Signal(syscall.SIGTERM)
a.signal(syscall.SIGTERM, true)
}()
default:
encodeError(w, errNotFound)
Expand Down
1 change: 0 additions & 1 deletion agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func TestDryRun(t *testing.T) {
func TestCancelDAG(t *testing.T) {
for _, abort := range []func(*Agent){
func(a *Agent) { a.Signal(syscall.SIGTERM) },
func(a *Agent) { a.Cancel() },
} {
a, d := testDAGAsync(t, "agent_sleep.yaml")
time.Sleep(time.Millisecond * 100)
Expand Down
9 changes: 9 additions & 0 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/yohamta/dagu/internal/constants"
"github.com/yohamta/dagu/internal/settings"
"github.com/yohamta/dagu/internal/utils"
"golang.org/x/sys/unix"
)

// DAG represents a DAG configuration.
Expand Down Expand Up @@ -476,6 +477,14 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) {
step.RepeatPolicy.Repeat = def.RepeatPolicy.Repeat
step.RepeatPolicy.Interval = time.Second * time.Duration(def.RepeatPolicy.IntervalSec)
}
if def.SignalOnStop != nil {
sigDef := *def.SignalOnStop
sig := unix.SignalNum(sigDef)
if sig == 0 {
return nil, fmt.Errorf("invalid signal: %s", sigDef)
}
step.SignalOnStop = sigDef
}
step.MailOnError = def.MailOnError
step.Preconditions = loadPreCondition(def.Preconditions)
return step, nil
Expand Down
1 change: 1 addition & 0 deletions internal/dag/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type stepDef struct {
RepeatPolicy *repeatPolicyDef
MailOnError bool
Preconditions []*conditionDef
SignalOnStop *string
}

type continueOnDef struct {
Expand Down
36 changes: 36 additions & 0 deletions internal/dag/loader_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dag

import (
"fmt"
"path"
"testing"
"time"
Expand Down Expand Up @@ -81,6 +82,41 @@ steps:
require.Error(t, err)
}

func TestLoadSignalOnStop(t *testing.T) {
for _, tc := range []struct {
sig string
want string
err bool
}{
{
sig: "SIGINT",
want: "SIGINT",
err: false,
},
{
sig: "2000",
err: true,
},
} {
dat := fmt.Sprintf(`name: test DAG
steps:
- name: "1"
command: "true"
signalOnStop: "%s"
`, tc.sig)
l := &Loader{}
ret, err := l.LoadData([]byte(dat))
if tc.err {
require.Error(t, err)
continue
}
require.NoError(t, err)

step := ret.Steps[0]
require.Equal(t, step.SignalOnStop, tc.want)
}
}

func TestLoadErrorFileNotExist(t *testing.T) {
l := &Loader{}
_, err := l.Load(path.Join(testDir, "not_existing_file.yaml"), "")
Expand Down
1 change: 1 addition & 0 deletions internal/dag/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Step struct {
RepeatPolicy RepeatPolicy
MailOnError bool
Preconditions []*Condition
SignalOnStop string
}

type RetryPolicy struct {
Expand Down
11 changes: 8 additions & 3 deletions internal/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/executor"
"github.com/yohamta/dagu/internal/utils"
"golang.org/x/sys/unix"
)

type NodeStatus int
Expand Down Expand Up @@ -177,13 +178,17 @@ func (n *Node) updateStatus(status NodeStatus) {
n.Status = status
}

func (n *Node) signal(sig os.Signal) {
func (n *Node) signal(sig os.Signal, allowOverride bool) {
n.mu.Lock()
defer n.mu.Unlock()
status := n.Status
if status == NodeStatus_Running && n.cmd != nil {
log.Printf("Sending %s signal to %s", sig, n.Name)
utils.LogErr("sending signal", n.cmd.Kill(sig))
sigsig := sig
if allowOverride && n.Step.SignalOnStop != "" {
sigsig = unix.SignalNum(n.Step.SignalOnStop)
}
log.Printf("Sending %s signal to %s", sigsig, n.Name)
utils.LogErr("sending signal", n.cmd.Kill(sigsig))
}
if status == NodeStatus_Running {
n.Status = NodeStatus_Cancel
Expand Down
23 changes: 22 additions & 1 deletion internal/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,28 @@ func TestSignal(t *testing.T) {

go func() {
time.Sleep(100 * time.Millisecond)
n.signal(syscall.SIGTERM)
n.signal(syscall.SIGTERM, false)
}()

n.updateStatus(NodeStatus_Running)
err := n.Execute()

require.Error(t, err)
require.Equal(t, n.Status, NodeStatus_Cancel)
}

func TestSignalSpecified(t *testing.T) {
n := &Node{
Step: &dag.Step{
Command: "sleep",
Args: []string{"100"},
OutputVariables: &sync.Map{},
SignalOnStop: "SIGINT",
}}

go func() {
time.Sleep(100 * time.Millisecond)
n.signal(syscall.SIGTERM, true)
}()

n.updateStatus(NodeStatus_Running)
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
// Signal sends a signal to the scheduler.
// for a node with repeat policy, it does not stop the node and
// wait to finish current run.
func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool) {
func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool, allowOverride bool) {
if !sc.IsCanceled() {
sc.setCanceled()
}
Expand All @@ -220,7 +220,7 @@ func (sc *Scheduler) Signal(g *ExecutionGraph, sig os.Signal, done chan bool) {
// for a repetitive task, we'll wait for the job to finish
// until time reaches max wait time
} else {
node.signal(sig)
node.signal(sig, allowOverride)
}
}
if done != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestSchedulerOnSignal(t *testing.T) {

go func() {
<-time.After(time.Millisecond * 50)
sc.Signal(g, syscall.SIGTERM, nil)
sc.Signal(g, syscall.SIGTERM, nil, false)
}()

err := sc.Schedule(g, nil)
Expand All @@ -386,7 +386,7 @@ func TestSchedulerOnCancel(t *testing.T) {
done := make(chan bool)
go func() {
<-time.After(time.Millisecond * 500)
sc.Signal(g, syscall.SIGTERM, done)
sc.Signal(g, syscall.SIGTERM, done, false)
}()

err := sc.Schedule(g, nil)
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestStopRepetitiveTaskGracefully(t *testing.T) {
done := make(chan bool)
go func() {
<-time.After(time.Millisecond * 100)
sc.Signal(g, syscall.SIGTERM, done)
sc.Signal(g, syscall.SIGTERM, done, false)
}()

err := sc.Schedule(g, nil)
Expand Down

0 comments on commit 8b34bc0

Please sign in to comment.