diff --git a/README.md b/README.md index 34bc7b4a5..5f114c304 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,10 @@ It executes [DAGs (Directed acyclic graph)](https://en.wikipedia.org/wiki/Direct - [Parameters](#parameters) - [Command Substitution](#command-substitution) - [Conditional Logic](#conditional-logic) + - [Run Code Snippet](#run-code-snippet) - [State Handlers](#state-handlers) - - [Redirection](#redirection) - [Output](#output) + - [Redirection](#redirection) - [Repeating Task](#repeating-task) - [All Available Fields](#all-available-fields) - [Admin Configuration](#admin-configuration) @@ -121,9 +122,11 @@ You can define workflows in a simple [YAML format](https://yohamta.github.io/doc name: minimal configuration # DAG's name steps: # Steps inside the DAG - name: step 1 # Step's name (should be unique within the file) - command: python main_1.py # Command and arguments to execute + command: ehho hello # Command and arguments to execute - name: step 2 - command: python main_2.py + command: bash + script: | # [optional] arbitrary script in any language + echo "world" depends: - step 1 # [optional] Name of the step to depend on ``` @@ -198,21 +201,29 @@ steps: skipped: true ``` -### Redirection +### Run Code Snippet -`stdout` field can be used to write standard output to a file. +`script` field provides a way to run arbitrary snippets of code in any language. ```yaml name: example steps: - - name: create a file - command: "echo hello" - stdout: "/tmp/hello" # the content will be "hello\n" + - name: step 1 + command: "bash" + script: | + cd /tmp + echo "hello world" > hello + cat hello + output: RESULT + - name: step 2 + command: echo ${RESULT} # hello world + depends: + - step 1 ``` ### Output -`output` field can be used to write standard output to a environment variable. Leading and trailing space will be trimmed automatically. The environment variables can be used in subsequent steps. +`output` field can be used to set a environment variable with standard output. Leading and trailing space will be trimmed automatically. The environment variables can be used in subsequent steps. ```yaml name: example @@ -222,6 +233,18 @@ steps: output: FOO # will contain "foo" ``` +### Redirection + +`stdout` field can be used to write standard output to a file. + +```yaml +name: example +steps: + - name: create a file + command: "echo hello" + stdout: "/tmp/hello" # the content will be "hello\n" +``` + ### State Handlers It is often desirable to take action when a specific event happens, for example, when a workflow fails. To achieve this, you can use `handlerOn` fields. @@ -268,8 +291,8 @@ delaySec: 1 # Interval seconds between steps maxActiveRuns: 1 # Max parallel number of running step params: param1 param2 # Default parameters for the DAG that can be referred to by $1, $2, and so on preconditions: # Precondisions for whether the DAG is allowed to run - - condition: "`echo 1`" # Command or variables to evaluate - expected: "1" # Expected value for the condition + - condition: "`echo $2`" # Command or variables to evaluate + expected: "param2" # Expected value for the condition mailOn: failure: true # Send a mail when the DAG failed success: true # Send a mail when the DAG finished @@ -287,7 +310,11 @@ steps: - name: some task # Step's name description: some task # Step's description dir: ${HOME}/logs # Working directory - command: python main.py $1 # Command and parameters + command: bash # Command and parameters + stdout: /tmp/outfile + ouptut: RESULT_VARIABLE + script: | + echo "any script" mailOn: failure: true # Send a mail when the step failed success: true # Send a mail when the step finished @@ -300,8 +327,8 @@ steps: repeat: true # Boolean whether to repeat this step intervalSec: 60 # Interval time to repeat the step in seconds preconditions: # Precondisions for whether the step is allowed to run - - condition: "`echo 1`" # Command or variables to evaluate - expected: "1" # Expected Value for the condition + - condition: "`echo $1`" # Command or variables to evaluate + expected: "param1" # Expected Value for the condition ``` The global configuration file `~/.dagu/config.yaml` is useful to gather common settings, such as `logDir` or `env`. diff --git a/internal/config/config.go b/internal/config/config.go index b93814f29..2044ec1e1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -372,7 +372,8 @@ func buildStep(variables []string, def *stepDef) (*Step, error) { step.Name = def.Name step.Description = def.Description step.CmdWithArgs = def.Command - step.Command, step.Args = utils.SplitCommand(step.CmdWithArgs) // Will be eva + step.Command, step.Args = utils.SplitCommand(step.CmdWithArgs) + step.Script = def.Script step.Stdout = os.ExpandEnv(def.Stdout) step.Output = def.Output step.Dir = os.ExpandEnv(def.Dir) diff --git a/internal/config/definition.go b/internal/config/definition.go index f30e6e4cb..76ee1733c 100644 --- a/internal/config/definition.go +++ b/internal/config/definition.go @@ -36,6 +36,7 @@ type stepDef struct { Description string Dir string Command string + Script string Stdout string Output string Depends []string diff --git a/internal/config/step.go b/internal/config/step.go index 0aeb22ab3..e19261865 100644 --- a/internal/config/step.go +++ b/internal/config/step.go @@ -13,6 +13,7 @@ type Step struct { Dir string CmdWithArgs string Command string + Script string Stdout string Output string Args []string diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index 7d3db7dfe..807057131 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -62,6 +62,7 @@ type Node struct { stdoutWriter *bufio.Writer outputWriter *os.File outputReader *os.File + scriptFile *os.File } type NodeState struct { @@ -80,7 +81,13 @@ func (n *Node) Execute() error { if n.CmdWithArgs != "" { n.Command, n.Args = utils.SplitCommand(os.ExpandEnv(n.CmdWithArgs)) } - n.cmd = exec.CommandContext(ctx, n.Command, n.Args...) + args := n.Args + if n.scriptFile != nil { + args = []string{} + args = append(args, n.Args...) + args = append(args, n.scriptFile.Name()) + } + n.cmd = exec.CommandContext(ctx, n.Command, args...) cmd := n.cmd cmd.Dir = n.Dir cmd.Env = append(cmd.Env, n.Variables...) @@ -172,12 +179,14 @@ func (n *Node) setup(logDir string, requestId string) error { n.StartedAt.Format("20060102.15:04:05.000"), utils.TruncString(requestId, 8), )) - if err := n.openLog(); err != nil { - n.Error = err - return err + setup := []func() error{ + n.setupLog, + n.setupStdout, + n.setupScript, } - if n.Stdout != "" { - if err := n.openStdout(); err != nil { + for _, fn := range setup { + err := fn() + if err != nil { n.Error = err return err } @@ -185,22 +194,38 @@ func (n *Node) setup(logDir string, requestId string) error { return nil } -func (n *Node) openStdout() error { - f := n.Stdout - if !filepath.IsAbs(f) { - f = filepath.Join(n.Dir, f) +func (n *Node) setupScript() (err error) { + if n.Script != "" { + n.scriptFile, _ = os.CreateTemp(n.Dir, "dagu_script-") + if _, err = n.scriptFile.WriteString(n.Script); err != nil { + return + } + defer func() { + _ = n.scriptFile.Close() + }() + err = n.scriptFile.Sync() } - var err error - n.stdoutFile, err = utils.OpenOrCreateFile(f) - if err != nil { - n.Error = err - return err + return err +} + +func (n *Node) setupStdout() error { + if n.Stdout != "" { + f := n.Stdout + if !filepath.IsAbs(f) { + f = filepath.Join(n.Dir, f) + } + var err error + n.stdoutFile, err = utils.OpenOrCreateFile(f) + if err != nil { + n.Error = err + return err + } + n.stdoutWriter = bufio.NewWriter(n.stdoutFile) } - n.stdoutWriter = bufio.NewWriter(n.stdoutFile) return nil } -func (n *Node) openLog() error { +func (n *Node) setupLog() error { if n.Log == "" { return nil } @@ -232,6 +257,9 @@ func (n *Node) teardown() error { lastErr = err } } + if n.scriptFile != nil { + _ = os.Remove(n.scriptFile.Name()) + } return lastErr } diff --git a/internal/scheduler/node_test.go b/internal/scheduler/node_test.go index eb31e7709..463354b02 100644 --- a/internal/scheduler/node_test.go +++ b/internal/scheduler/node_test.go @@ -78,6 +78,29 @@ func TestLogAndStdout(t *testing.T) { require.Equal(t, "done\n", string(dat)) } +func TestNode(t *testing.T) { + n := &Node{ + Step: &config.Step{ + Command: "echo", + Args: []string{"hello"}, + Dir: os.Getenv("HOME"), + }, + } + n.incDoneCount() + require.Equal(t, 1, n.ReadDoneCount()) + + n.incRetryCount() + require.Equal(t, 1, n.ReadRetryCount()) + + n.id = 1 + n.init() + require.Nil(t, n.Variables) + + n.id = 0 + n.init() + require.Equal(t, n.Variables, []string{}) +} + func TestOutput(t *testing.T) { n := &Node{ Step: &config.Step{ @@ -104,3 +127,35 @@ func TestOutput(t *testing.T) { val := os.Getenv("OUTPUT_TEST") require.Equal(t, "hello", val) } + +func TestRunScript(t *testing.T) { + n := &Node{ + Step: &config.Step{ + Command: "sh", + Args: []string{}, + Dir: os.Getenv("HOME"), + Script: ` + echo hello + `, + Output: "OUTPUT_TEST", + }, + } + err := n.setup(os.Getenv("HOME"), "test-request-id") + require.FileExists(t, n.logFile.Name()) + + require.NoError(t, err) + defer func() { + _ = n.teardown() + }() + + b, _ := os.ReadFile(n.scriptFile.Name()) + require.Equal(t, n.Script, string(b)) + + err = n.Execute() + require.NoError(t, err) + err = n.teardown() + require.NoError(t, err) + + require.Equal(t, "hello", os.Getenv("OUTPUT_TEST")) + require.NoFileExists(t, n.scriptFile.Name()) +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b5c81f122..0f068b405 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -117,12 +117,18 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error { wg.Done() }() + setup := true if !sc.Dry { - node.setup(sc.LogDir, sc.RequestId) + if err := node.setup(sc.LogDir, sc.RequestId); err != nil { + setup = false + node.Error = err + sc.lastError = err + node.updateStatus(NodeStatus_Error) + } defer node.teardown() } - for !sc.IsCanceled() { + for setup && !sc.IsCanceled() { var err error = nil if !sc.Dry { err = node.Execute() diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 9cf73d091..3e691a947 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -546,6 +546,26 @@ func TestSchedulerStatusText(t *testing.T) { } } +func TestSetupNodeFailure(t *testing.T) { + g, _ := NewExecutionGraph( + &config.Step{ + Name: "1", + Command: "sh", + Dir: "~/", + Script: "echo 1", + }, + ) + sc := New(&Config{}) + err := sc.Schedule(g, nil) + require.Error(t, err) + require.Equal(t, sc.Status(g), SchedulerStatus_Error) + + nodes := g.Nodes() + assert.Equal(t, sc.Status(g), SchedulerStatus_Error) + assert.Equal(t, NodeStatus_Error, nodes[0].Status) + assert.Equal(t, nodes[0].DoneCount, 0) +} + func step(name, command string, depends ...string) *config.Step { cmd, args := utils.SplitCommand(command) return &config.Step{