Skip to content

Commit

Permalink
Merge pull request #188 from yohamta/feat/multiple-schedules
Browse files Browse the repository at this point in the history
feat: allow setting multiple schedules
  • Loading branch information
yottahmd authored Jul 5, 2022
2 parents eaed682 + 694a4b5 commit 5593bbd
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 32 deletions.
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ It runs [DAGs (Directed acyclic graph)](https://en.wikipedia.org/wiki/Directed_a
- [How can I retry a DAG from a specific task?](#how-can-i-retry-a-dag-from-a-specific-task)
- [Does it provide sucheduler daemon?](#does-it-provide-sucheduler-daemon)
- [How does it track running processes without DBMS?](#how-does-it-track-running-processes-without-dbms)
- [How to run scheduler process running](#how-to-run-scheduler-process-running)
- [License](#license)
- [Contributors](#contributors)

Expand Down Expand Up @@ -310,7 +311,7 @@ steps:

### Schedule

To run the DAG on a specific schedule, you can set a cron expression in the `schedule` field and start `dagu scheduler` process in the system.
To run the DAG on a specific schedule, you can specify the schedule it with cron expression in the `schedule` field. You need to keep `dagu scheduler` process running in the system.

```yaml
schedule: "5 4 * * *" # Run at 04:05.
Expand All @@ -319,13 +320,25 @@ steps:
command: job.sh
```

If you want to set multiple schedules, you can do it as follows:

```yaml
schedule:
- "30 7 * * *" # Run at 7:30
- "0 20 * * *" # Also run at 20:00
steps:
- name: scheduled job
command: job.sh
```

### All Available Fields

Combining these settings gives you granular control over how the DAG runs.

```yaml
name: all configuration # Name (optional, default is filename)
description: run a DAG # Description
schedule: "0 * * * *" # Execution schedule (cron expression)
group: DailyJobs # Group name to organize DAGs (optional)
tags: example # Free tags (separated by comma)
env: # Environment variables
Expand Down Expand Up @@ -468,6 +481,26 @@ Yes. Please use `scheduler` subcommand.

Dagu uses Unix sockets to communicate with running processes.

### How to run scheduler process running

Easiest way is to create the simple script and call it every minutes using cron. It does not need root account.

```bash
#!/bin/bash
process="dagu scheduler"
command="/usr/bin/dagu scheduler"
if ps ax | grep -v grep | grep "$process" > /dev/null
then
exit
else
$command &
fi
exit
```


## License

This project is licensed under the GNU GPLv3 - see the [LICENSE.md](LICENSE.md) file for details
Expand Down
40 changes: 33 additions & 7 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Config struct {
ConfigPath string
Group string
Name string
Schedule cron.Schedule
ScheduleExp string
Schedule []cron.Schedule
ScheduleExp []string
Description string
Env []string
LogDir string
Expand Down Expand Up @@ -180,12 +180,26 @@ func (b *builder) buildFromDefinition(def *configDefinition, globalConfig *Confi
c.Delay = time.Second * time.Duration(def.DelaySec)
c.Tags = parseTags(def.Tags)

c.ScheduleExp = def.Schedule
if def.Schedule != "" {
c.Schedule, err = cronParser.Parse(def.Schedule)
if err != nil {
return nil, fmt.Errorf("invalid schedule: %s", err)
switch (def.Schedule).(type) {
case string:
c.ScheduleExp = []string{def.Schedule.(string)}
case []interface{}:
items := []string{}
for _, s := range def.Schedule.([]interface{}) {
if a, ok := s.(string); ok {
items = append(items, a)
} else {
return nil, fmt.Errorf("schedule must be a string or an array of strings")
}
}
c.ScheduleExp = items
case nil:
default:
return nil, fmt.Errorf("invalid schedule type: %T", def.Schedule)
}
c.Schedule, err = parseSchedule(c.ScheduleExp)
if err != nil {
return nil, err
}

if b.headOnly {
Expand Down Expand Up @@ -494,6 +508,18 @@ func parseTags(value string) []string {
return ret
}

func parseSchedule(values []string) ([]cron.Schedule, error) {
ret := []cron.Schedule{}
for _, v := range values {
sc, err := cronParser.Parse(v)
if err != nil {
return nil, fmt.Errorf("invalid schedule: %s", err)
}
ret = append(ret, sc)
}
return ret, nil
}

func assertDef(def *configDefinition) error {
if len(def.Steps) == 0 {
return fmt.Errorf("at least one step must be specified")
Expand Down
34 changes: 25 additions & 9 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/yohamta/dagu/internal/settings"
Expand Down Expand Up @@ -248,30 +247,47 @@ tags: %s
}

func TestSchedule(t *testing.T) {
tm := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
for _, test := range []struct {
Schedule string
Want time.Time
Def string
Err bool
Want int
}{
{
Schedule: "*/5 * * * *",
Want: tm.Add(5 * time.Minute),
Def: "schedule: \"*/5 * * * *\"",
Want: 1,
},
{
Def: `schedule:
- "*/5 * * * *"
- "* * * * *"`,
Want: 2,
},
{
Def: `schedule:
- true
- "* * * * *"`,
Err: true,
},
} {
l := &Loader{
HomeDir: utils.MustGetUserHomeDir(),
}
d, err := l.unmarshalData([]byte(fmt.Sprintf(`schedule: "%s"`, test.Schedule)))
d, err := l.unmarshalData([]byte(test.Def))
require.NoError(t, err)

def, err := l.decode(d)
require.NoError(t, err)

b := &builder{}
cfg, err := b.buildFromDefinition(def, nil)
require.NoError(t, err)

require.Equal(t, test.Want, cfg.Schedule.Next(tm))
if test.Err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, test.Want, len(cfg.Schedule))
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/config/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ type configDefinition struct {
Name string
Group string
Description string
Schedule string
Schedule interface{}
LogDir string
Env interface{}
HandlerOn handerOnDef
Expand Down
19 changes: 12 additions & 7 deletions internal/runner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,21 @@ func (j *job) Run() error {
if err != nil {
return err
}
if s.Status == scheduler.SchedulerStatus_Running {
switch s.Status {
case scheduler.SchedulerStatus_Running:
// already running
return ErrJobRunning
}
if !j.StartTime.IsZero() {
// check if it's already finished
t, _ := utils.ParseTime(s.StartedAt)
if j.StartTime.Before(t) || j.StartTime.Equal(t) {
return ErrJobFinished
case scheduler.SchedulerStatus_None:
default:
// check the last execution time
t, err := utils.ParseTime(s.StartedAt)
if err == nil {
t = t.Truncate(time.Second * 60)
if t.After(j.StartTime) || j.StartTime.Equal(t) {
return ErrJobFinished
}
}
// should not be here
}
return c.Start(j.Config.Command, j.Config.WorkDir, "")
}
Expand Down
6 changes: 3 additions & 3 deletions internal/runner/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func TestJob(t *testing.T) {
require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)

// Fail to run the job because it's already finished
j = &job{
j2 := &job{
DAG: dag,
Config: testConfig(),
StartTime: time.Now().Add(-time.Minute),
StartTime: j.StartTime,
}
err = j.Run()
err = j2.Run()
require.Equal(t, ErrJobFinished, err)
}
10 changes: 6 additions & 4 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ func (r *Runner) readEntries(now time.Time) (entries []*Entry, err error) {
log.Printf("failed to read dag config: %s", err)
continue
}
if dag.Schedule != nil {
for _, sc := range dag.Schedule {
next := sc.Next(now)
entries = append(entries, &Entry{
Next: dag.Schedule.Next(now),
Next: sc.Next(now),
Job: &job{
DAG: dag,
Config: r.Config.Admin,
DAG: dag,
Config: r.Config.Admin,
StartTime: next,
},
})
}
Expand Down
36 changes: 36 additions & 0 deletions internal/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,42 @@ func TestRun(t *testing.T) {
}
}

func TestRunOnlyOnce(t *testing.T) {
cfg := testDag(t, "job1", "* * * * *", "true")
cont := controller.New(cfg)
// now := time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC)
utils.FixedTime = time.Time{}

startRunner := func() *Runner {
r := New(&Config{
Admin: &admin.Config{
Command: testBin,
DAGs: testHomeDir,
},
})
go func() {
r.Start()
}()
return r
}

r := startRunner()
time.Sleep(time.Second + time.Millisecond*100)
r.Stop()

s, _ := cont.GetLastStatus()
require.Equal(t, scheduler.SchedulerStatus_Success, s.Status)
s.Status = scheduler.SchedulerStatus_Error
cont.UpdateStatus(s)

r = startRunner()
time.Sleep(time.Second + time.Millisecond*100)
r.Stop()

s, _ = cont.GetLastStatus()
require.Equal(t, scheduler.SchedulerStatus_Error, s.Status)
}

func TestNextTick(t *testing.T) {
n := time.Date(2020, 1, 1, 1, 0, 50, 0, time.UTC)
utils.FixedTime = n
Expand Down

0 comments on commit 5593bbd

Please sign in to comment.