Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add restart feature #318

Merged
merged 7 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- [Scheduler](#scheduler)
- [Execution Schedule](#execution-schedule)
- [Stop Schedule](#stop-schedule)
- [Restart Schedule](#restart-schedule)
- [Run Scheduler as a daemon](#run-scheduler-as-a-daemon)
- [Scheduler Configuration](#scheduler-configuration)
- [REST API Interface](#rest-api-interface)
Expand Down Expand Up @@ -153,6 +154,7 @@ You can execute the example by pressing the `Start` button
- `dagu status <file>` - Displays the current status of the DAG
- `dagu retry --req=<request-id> <file>` - Re-runs the specified DAG run
- `dagu stop <file>` - Stops the DAG execution by sending TERM signals
- `dagu restart <file>` - Restart the current running DAG
- `dagu dry [--params=<params>] <file>` - Dry-runs the DAG
- `dagu server [--host=<host>] [--port=<port>] [--dags=<path/to/the DAGs directory>]` - Starts the web server for web UI
- `dagu scheduler [--dags=<path/to/the DAGs directory>]` - Starts the scheduler process
Expand Down Expand Up @@ -549,6 +551,30 @@ steps:
command: main.sh
```

### Restart Schedule

If you want to restart a DAG process on a fixed schedule, the `restart` field is also available. At the restart time, the DAG execution will be stopped and restarted again.

```yaml
schedule:
start: "0 8 * * *" # starts at 8:00
restart: "0 12 * * *" # restarts at 12:00
stop: "0 13 * * *" # stops at 13:00
steps:
- name: scheduled job
command: job.sh
```

The wait time after the job is stopped before restart can be configured in the DAG definition as follows. The default value is `0` (zero).

```yaml
RestartWaitSec: 60 # Wait 60s after the process is stopped, then restart the DAG.

steps:
- name: step1
command: python some_app.py
```

### Run Scheduler as a daemon

The easiest way to make sure the process is always running on your system is to create the script below and execute it every minute using cron (you don't need `root` account in this way):
Expand Down
1 change: 1 addition & 0 deletions cmd/dagu.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func makeApp() *cli.App {
newStartCommand(),
newStatusCommand(),
newStopCommand(),
newRestartCommand(),
newRetryCommand(),
newDryCommand(),
newServerCommand(),
Expand Down
82 changes: 82 additions & 0 deletions cmd/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"fmt"
"log"
"time"

"github.com/urfave/cli/v2"
"github.com/yohamta/dagu/internal/controller"
"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/scheduler"
)

func newRestartCommand() *cli.Command {
return &cli.Command{
Name: "restart",
Usage: "dagu restart <DAG file>",
Flags: globalFlags,
Action: func(c *cli.Context) error {
d, err := loadDAG(c, c.Args().Get(0), "")
if err != nil {
return err
}
return restart(d, c)
},
}
}

const resetartTimeout = time.Second * 180

func restart(d *dag.DAG, ctx *cli.Context) error {
c := controller.New(d)

// stop the DAG
wait := time.Millisecond * 500
timer := time.Duration(0)
timeout := resetartTimeout + d.MaxCleanUpTime

st, err := c.GetStatus()
if err != nil {
return fmt.Errorf("restart failed because failed to get status: %v", err)
}
switch st.Status {
case scheduler.SchedulerStatus_Running:
log.Printf("Stopping %s for restart...", d.Name)
for {
st, err := c.GetStatus()
if err != nil {
log.Printf("Failed to get status: %v", err)
continue
}
if st.Status == scheduler.SchedulerStatus_None {
break
}
if err := c.Stop(); err != nil {
return err
}
time.Sleep(wait)
timer += wait
if timer > timeout {
return fmt.Errorf("restart failed because timeout")
}
}

// wait for restartWaitTime
log.Printf("wait for restart %s", d.RestartWait)
time.Sleep(d.RestartWait)
}

// retrieve the parameter and start the DAG
log.Printf("Restarting %s...", d.Name)
st, err = c.GetLastStatus()
if err != nil {
return fmt.Errorf("failed to get the last status: %w", err)
}
params := st.Params
d, err = loadDAG(ctx, ctx.Args().Get(0), params)
if err != nil {
return err
}
return start(d)
}
84 changes: 84 additions & 0 deletions cmd/restart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/yohamta/dagu/internal/controller"
"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/database"
"github.com/yohamta/dagu/internal/scheduler"
)

func Test_restartCommand(t *testing.T) {
cfg := testConfig("restart.yaml")
cl := &dag.Loader{}
d, _ := cl.Load(cfg, "")
c := controller.New(d)

// start the DAG
println("start the DAG")
go func() {
app := makeApp()
test := appTest{
args: []string{"", "start", "--params=restart_test", cfg}, errored: false,
}
runAppTest(app, test, t)
}()

require.Eventually(t, func() bool {
s, _ := c.GetStatus()
return s.Status == scheduler.SchedulerStatus_Running
}, time.Second*5, time.Millisecond*50)

time.Sleep(time.Millisecond * 50)

// restart the DAG
go func() {
app2 := makeApp()
runAppTestOutput(app2, appTest{
args: []string{"", "restart", cfg}, errored: false,
output: []string{"wait for restart 1s", "Restarting"},
}, t)
}()

// check canceled
require.Eventually(t, func() bool {
s, _ := c.GetLastStatus()
return s != nil && s.Status == scheduler.SchedulerStatus_Cancel
}, time.Second*5, time.Millisecond*50)

// check restarted
require.Eventually(t, func() bool {
s, _ := c.GetLastStatus()
return s != nil && s.Status == scheduler.SchedulerStatus_Running
}, time.Second*5, time.Millisecond*50)

// cancel the DAG
go func() {
app3 := makeApp()
runAppTestOutput(app3, appTest{
args: []string{"", "stop", cfg}, errored: false,
output: []string{"Stopping..."},
}, t)
}()

// check canceled
require.Eventually(t, func() bool {
s, _ := c.GetLastStatus()
return s != nil && s.Status == scheduler.SchedulerStatus_Cancel
}, time.Second*5, time.Millisecond*50)

// check history
db := &database.Database{Config: database.DefaultConfig()}
require.Eventually(t, func() bool {
s := db.ReadStatusHist(cfg, 100)
return len(s) == 2 && s[1].Status.Status == scheduler.SchedulerStatus_Cancel
}, time.Second*5, time.Millisecond*50)

// check result
s := db.ReadStatusHist(cfg, 2)
require.Equal(t, "restart_test", s[0].Status.Params)
require.Equal(t, "restart_test", s[1].Status.Params)
}
9 changes: 9 additions & 0 deletions cmd/testdata/restart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
params: "default_param"
steps:
- name: "1"
command: "echo $1"
- name: "2"
command: "sleep 10000"
depends:
- "1"
restartWaitSec: 1
13 changes: 13 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func (c *Controller) Retry(bin string, workDir string, reqId string) (err error)
return
}

func (c *Controller) Restart(bin string, workDir string) error {
args := []string{"restart", c.Location}
cmd := exec.Command(bin, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Dir = workDir
cmd.Env = os.Environ()
err := cmd.Start()
if err != nil {
return err
}
return cmd.Wait()
}

func (c *Controller) GetStatus() (*models.Status, error) {
client := sock.Client{Addr: c.SockAddr()}
ret, err := client.Request("GET", "/status")
Expand Down
15 changes: 15 additions & 0 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ func TestStartStop(t *testing.T) {
}, time.Millisecond*1500, time.Millisecond*100)
}

func TestRestart(t *testing.T) {
file := testDAG("restart.yaml")
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

c := controller.New(dag.DAG)
err = c.Restart(path.Join(utils.MustGetwd(), "../../bin/dagu"), "")
require.NoError(t, err)

st, err := c.GetLastStatus()
require.NoError(t, err)
require.Equal(t, scheduler.SchedulerStatus_Success, st.Status)
}

func TestRetry(t *testing.T) {
file := testDAG("retry.yaml")
dr := controller.NewDAGReader()
Expand Down
3 changes: 3 additions & 0 deletions internal/controller/testdata/restart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
steps:
- name: "1"
command: "true"
28 changes: 23 additions & 5 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type DAG struct {
Name string
Schedule []*Schedule
StopSchedule []*Schedule
RestartSchedule []*Schedule
Description string
Env []string
LogDir string
Expand All @@ -34,6 +35,7 @@ type DAG struct {
InfoMail *MailConfig
Smtp *SmtpConfig
Delay time.Duration
RestartWait time.Duration
HistRetentionDays int
Preconditions []*Condition
MaxActiveRuns int
Expand Down Expand Up @@ -191,6 +193,7 @@ func (b *builder) buildFromDefinition(def *configDefinition, baseConfig *DAG) (d
}
}
d.Delay = time.Second * time.Duration(def.DelaySec)
d.RestartWait = time.Second * time.Duration(def.RestartWaitSec)
d.Tags = parseTags(def.Tags)

for _, bs := range []buildStep{
Expand Down Expand Up @@ -236,9 +239,16 @@ func (b *builder) buildFromDefinition(def *configDefinition, baseConfig *DAG) (d
return d, nil
}

const (
scheduleStart = "start"
scheduleStop = "stop"
scheduleRestart = "restart"
)

func (b *builder) buildSchedule(def *configDefinition, d *DAG) error {
starts := []string{}
stops := []string{}
restarts := []string{}

switch (def.Schedule).(type) {
case string:
Expand All @@ -258,23 +268,27 @@ func (b *builder) buildSchedule(def *configDefinition, d *DAG) error {
}
kk := k.(string)
switch kk {
case "start", "stop":
case scheduleStart, scheduleStop, scheduleRestart:
switch (v).(type) {
case string:
switch kk {
case "start":
case scheduleStart:
starts = append(starts, v.(string))
case "stop":
case scheduleStop:
stops = append(stops, v.(string))
case scheduleRestart:
restarts = append(restarts, v.(string))
}
case []interface{}:
for _, vv := range v.([]interface{}) {
if vvv, ok := vv.(string); ok {
switch kk {
case "start":
case scheduleStart:
starts = append(starts, vvv)
case "stop":
case scheduleStop:
stops = append(stops, vvv)
case scheduleRestart:
restarts = append(restarts, vvv)
}
} else {
return fmt.Errorf("schedule must be a string or an array of strings")
Expand All @@ -297,6 +311,10 @@ func (b *builder) buildSchedule(def *configDefinition, d *DAG) error {
return err
}
d.StopSchedule, err = parseSchedule(stops)
if err != nil {
return err
}
d.RestartSchedule, err = parseSchedule(restarts)
return err
}

Expand Down
Loading