Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run code snippet #108

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ It executes [DAGs (Directed acyclic graph)](https://en.wikipedia.org/wiki/Direct
- [Parameters](#parameters)
- [Command Substitution](#command-substitution)
- [Conditional Logic](#conditional-logic)
- [Run Code Snippet](#run-code-snippet)
- [State Handlers](#state-handlers)
- [Redirection](#redirection)
- [Output](#output)
- [Redirection](#redirection)
- [Repeating Task](#repeating-task)
- [All Available Fields](#all-available-fields)
- [Admin Configuration](#admin-configuration)
Expand Down Expand Up @@ -121,9 +122,11 @@ You can define workflows in a simple [YAML format](https://yohamta.github.io/doc
name: minimal configuration # DAG's name
steps: # Steps inside the DAG
- name: step 1 # Step's name (should be unique within the file)
command: python main_1.py # Command and arguments to execute
command: ehho hello # Command and arguments to execute
- name: step 2
command: python main_2.py
command: bash
script: | # [optional] arbitrary script in any language
echo "world"
depends:
- step 1 # [optional] Name of the step to depend on
```
Expand Down Expand Up @@ -198,21 +201,29 @@ steps:
skipped: true
```

### Redirection
### Run Code Snippet

`stdout` field can be used to write standard output to a file.
`script` field provides a way to run arbitrary snippets of code in any language.

```yaml
name: example
steps:
- name: create a file
command: "echo hello"
stdout: "/tmp/hello" # the content will be "hello\n"
- name: step 1
command: "bash"
script: |
cd /tmp
echo "hello world" > hello
cat hello
output: RESULT
- name: step 2
command: echo ${RESULT} # hello world
depends:
- step 1
```

### Output

`output` field can be used to write standard output to a environment variable. Leading and trailing space will be trimmed automatically. The environment variables can be used in subsequent steps.
`output` field can be used to set a environment variable with standard output. Leading and trailing space will be trimmed automatically. The environment variables can be used in subsequent steps.

```yaml
name: example
Expand All @@ -222,6 +233,18 @@ steps:
output: FOO # will contain "foo"
```

### Redirection

`stdout` field can be used to write standard output to a file.

```yaml
name: example
steps:
- name: create a file
command: "echo hello"
stdout: "/tmp/hello" # the content will be "hello\n"
```

### State Handlers

It is often desirable to take action when a specific event happens, for example, when a workflow fails. To achieve this, you can use `handlerOn` fields.
Expand Down Expand Up @@ -268,8 +291,8 @@ delaySec: 1 # Interval seconds between steps
maxActiveRuns: 1 # Max parallel number of running step
params: param1 param2 # Default parameters for the DAG that can be referred to by $1, $2, and so on
preconditions: # Precondisions for whether the DAG is allowed to run
- condition: "`echo 1`" # Command or variables to evaluate
expected: "1" # Expected value for the condition
- condition: "`echo $2`" # Command or variables to evaluate
expected: "param2" # Expected value for the condition
mailOn:
failure: true # Send a mail when the DAG failed
success: true # Send a mail when the DAG finished
Expand All @@ -287,7 +310,11 @@ steps:
- name: some task # Step's name
description: some task # Step's description
dir: ${HOME}/logs # Working directory
command: python main.py $1 # Command and parameters
command: bash # Command and parameters
stdout: /tmp/outfile
ouptut: RESULT_VARIABLE
script: |
echo "any script"
mailOn:
failure: true # Send a mail when the step failed
success: true # Send a mail when the step finished
Expand All @@ -300,8 +327,8 @@ steps:
repeat: true # Boolean whether to repeat this step
intervalSec: 60 # Interval time to repeat the step in seconds
preconditions: # Precondisions for whether the step is allowed to run
- condition: "`echo 1`" # Command or variables to evaluate
expected: "1" # Expected Value for the condition
- condition: "`echo $1`" # Command or variables to evaluate
expected: "param1" # Expected Value for the condition
```

The global configuration file `~/.dagu/config.yaml` is useful to gather common settings, such as `logDir` or `env`.
Expand Down
3 changes: 2 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ func buildStep(variables []string, def *stepDef) (*Step, error) {
step.Name = def.Name
step.Description = def.Description
step.CmdWithArgs = def.Command
step.Command, step.Args = utils.SplitCommand(step.CmdWithArgs) // Will be eva
step.Command, step.Args = utils.SplitCommand(step.CmdWithArgs)
step.Script = def.Script
step.Stdout = os.ExpandEnv(def.Stdout)
step.Output = def.Output
step.Dir = os.ExpandEnv(def.Dir)
Expand Down
1 change: 1 addition & 0 deletions internal/config/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type stepDef struct {
Description string
Dir string
Command string
Script string
Stdout string
Output string
Depends []string
Expand Down
1 change: 1 addition & 0 deletions internal/config/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Step struct {
Dir string
CmdWithArgs string
Command string
Script string
Stdout string
Output string
Args []string
Expand Down
62 changes: 45 additions & 17 deletions internal/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Node struct {
stdoutWriter *bufio.Writer
outputWriter *os.File
outputReader *os.File
scriptFile *os.File
}

type NodeState struct {
Expand All @@ -80,7 +81,13 @@ func (n *Node) Execute() error {
if n.CmdWithArgs != "" {
n.Command, n.Args = utils.SplitCommand(os.ExpandEnv(n.CmdWithArgs))
}
n.cmd = exec.CommandContext(ctx, n.Command, n.Args...)
args := n.Args
if n.scriptFile != nil {
args = []string{}
args = append(args, n.Args...)
args = append(args, n.scriptFile.Name())
}
n.cmd = exec.CommandContext(ctx, n.Command, args...)
cmd := n.cmd
cmd.Dir = n.Dir
cmd.Env = append(cmd.Env, n.Variables...)
Expand Down Expand Up @@ -172,35 +179,53 @@ func (n *Node) setup(logDir string, requestId string) error {
n.StartedAt.Format("20060102.15:04:05.000"),
utils.TruncString(requestId, 8),
))
if err := n.openLog(); err != nil {
n.Error = err
return err
setup := []func() error{
n.setupLog,
n.setupStdout,
n.setupScript,
}
if n.Stdout != "" {
if err := n.openStdout(); err != nil {
for _, fn := range setup {
err := fn()
if err != nil {
n.Error = err
return err
}
}
return nil
}

func (n *Node) openStdout() error {
f := n.Stdout
if !filepath.IsAbs(f) {
f = filepath.Join(n.Dir, f)
func (n *Node) setupScript() (err error) {
if n.Script != "" {
n.scriptFile, _ = os.CreateTemp(n.Dir, "dagu_script-")
if _, err = n.scriptFile.WriteString(n.Script); err != nil {
return
}
defer func() {
_ = n.scriptFile.Close()
}()
err = n.scriptFile.Sync()
}
var err error
n.stdoutFile, err = utils.OpenOrCreateFile(f)
if err != nil {
n.Error = err
return err
return err
}

func (n *Node) setupStdout() error {
if n.Stdout != "" {
f := n.Stdout
if !filepath.IsAbs(f) {
f = filepath.Join(n.Dir, f)
}
var err error
n.stdoutFile, err = utils.OpenOrCreateFile(f)
if err != nil {
n.Error = err
return err
}
n.stdoutWriter = bufio.NewWriter(n.stdoutFile)
}
n.stdoutWriter = bufio.NewWriter(n.stdoutFile)
return nil
}

func (n *Node) openLog() error {
func (n *Node) setupLog() error {
if n.Log == "" {
return nil
}
Expand Down Expand Up @@ -232,6 +257,9 @@ func (n *Node) teardown() error {
lastErr = err
}
}
if n.scriptFile != nil {
_ = os.Remove(n.scriptFile.Name())
}
return lastErr
}

Expand Down
55 changes: 55 additions & 0 deletions internal/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ func TestLogAndStdout(t *testing.T) {
require.Equal(t, "done\n", string(dat))
}

func TestNode(t *testing.T) {
n := &Node{
Step: &config.Step{
Command: "echo",
Args: []string{"hello"},
Dir: os.Getenv("HOME"),
},
}
n.incDoneCount()
require.Equal(t, 1, n.ReadDoneCount())

n.incRetryCount()
require.Equal(t, 1, n.ReadRetryCount())

n.id = 1
n.init()
require.Nil(t, n.Variables)

n.id = 0
n.init()
require.Equal(t, n.Variables, []string{})
}

func TestOutput(t *testing.T) {
n := &Node{
Step: &config.Step{
Expand All @@ -104,3 +127,35 @@ func TestOutput(t *testing.T) {
val := os.Getenv("OUTPUT_TEST")
require.Equal(t, "hello", val)
}

func TestRunScript(t *testing.T) {
n := &Node{
Step: &config.Step{
Command: "sh",
Args: []string{},
Dir: os.Getenv("HOME"),
Script: `
echo hello
`,
Output: "OUTPUT_TEST",
},
}
err := n.setup(os.Getenv("HOME"), "test-request-id")
require.FileExists(t, n.logFile.Name())

require.NoError(t, err)
defer func() {
_ = n.teardown()
}()

b, _ := os.ReadFile(n.scriptFile.Name())
require.Equal(t, n.Script, string(b))

err = n.Execute()
require.NoError(t, err)
err = n.teardown()
require.NoError(t, err)

require.Equal(t, "hello", os.Getenv("OUTPUT_TEST"))
require.NoFileExists(t, n.scriptFile.Name())
}
10 changes: 8 additions & 2 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,18 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error {
wg.Done()
}()

setup := true
if !sc.Dry {
node.setup(sc.LogDir, sc.RequestId)
if err := node.setup(sc.LogDir, sc.RequestId); err != nil {
setup = false
node.Error = err
sc.lastError = err
node.updateStatus(NodeStatus_Error)
}
defer node.teardown()
}

for !sc.IsCanceled() {
for setup && !sc.IsCanceled() {
var err error = nil
if !sc.Dry {
err = node.Execute()
Expand Down
20 changes: 20 additions & 0 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,26 @@ func TestSchedulerStatusText(t *testing.T) {
}
}

func TestSetupNodeFailure(t *testing.T) {
g, _ := NewExecutionGraph(
&config.Step{
Name: "1",
Command: "sh",
Dir: "~/",
Script: "echo 1",
},
)
sc := New(&Config{})
err := sc.Schedule(g, nil)
require.Error(t, err)
require.Equal(t, sc.Status(g), SchedulerStatus_Error)

nodes := g.Nodes()
assert.Equal(t, sc.Status(g), SchedulerStatus_Error)
assert.Equal(t, NodeStatus_Error, nodes[0].Status)
assert.Equal(t, nodes[0].DoneCount, 0)
}

func step(name, command string, depends ...string) *config.Step {
cmd, args := utils.SplitCommand(command)
return &config.Step{
Expand Down