Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#846] Fix: issue in calling sub-DAG #848

Merged
merged 2 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ dagu start <file or DAG name> [-- value1 value2 ...]
dagu status <file or DAG name>

# Re-runs the specified DAG run
dagu retry --req=<request-id> <file or DAG name>
dagu retry --request-id=<request-id> <file or DAG name>

# Stops the DAG execution
dagu stop <file or DAG name>
Expand Down
2 changes: 1 addition & 1 deletion cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
usage: "parameters to pass to the DAG",
}
requestIDFlag = commandLineFlag{
name: "req",
name: "request-id",
shorthand: "r",
usage: "request ID",
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (

func retryCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "retry --req=<request-id> /path/to/spec.yaml",
Use: "retry --request-id=<request-id> /path/to/spec.yaml",
Short: "Retry the DAG execution",
Long: `dagu retry --req=<request-id> /path/to/spec.yaml`,
Long: `dagu retry --request-id=<request-id> /path/to/spec.yaml`,
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, _ []string) error {
return bindCommonFlags(cmd, nil)
Expand Down Expand Up @@ -47,7 +47,7 @@ func runRetry(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to get quiet flag: %w", err)
}

requestID, err := cmd.Flags().GetString("req")
requestID, err := cmd.Flags().GetString("request-id")
if err != nil {
return fmt.Errorf("failed to get request ID: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRetryCommand(t *testing.T) {
requestID := status.Status.RequestID

// Retry with the request ID.
args = []string{"retry", fmt.Sprintf("--req=%s", requestID), dagFile.Location}
args = []string{"retry", fmt.Sprintf("--request-id=%s", requestID), dagFile.Location}
th.RunCommand(t, retryCmd(), cmdTest{
args: args,
expectedOut: []string{`[1=foo]`},
Expand Down
2 changes: 1 addition & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func runStart(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to get quiet flag: %w", err)
}

requestID, err := cmd.Flags().GetString("req")
requestID, err := cmd.Flags().GetString("request-id")
if err != nil {
return fmt.Errorf("failed to get request ID: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion docs/source/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The following commands are available for interacting with Dagu:
dagu status <file>

# Re-runs the specified DAG run
dagu retry --req=<request-id> <file>
dagu retry --request-id=<request-id> <file>

# Stops the DAG execution
dagu stop <file>
Expand Down
2 changes: 1 addition & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (e *client) Restart(_ context.Context, dag *digraph.DAG, opts RestartOption

func (e *client) Retry(_ context.Context, dag *digraph.DAG, requestID string) error {
args := []string{"retry"}
args = append(args, fmt.Sprintf("--req=%s", requestID))
args = append(args, fmt.Sprintf("--request-id=%s", requestID))
args = append(args, dag.Location)
// nolint:gosec
cmd := exec.Command(e.executable, args...)
Expand Down
19 changes: 15 additions & 4 deletions internal/digraph/executor/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ type subWorkflow struct {
writer io.Writer
}

var errWorkingDirNotExist = fmt.Errorf("working directory does not exist")
var ErrWorkingDirNotExist = fmt.Errorf("working directory does not exist")

func newSubWorkflow(
ctx context.Context, step digraph.Step,
) (Executor, error) {
executable, err := os.Executable()
executable, err := executablePath()
if err != nil {
return nil, fmt.Errorf("failed to get executable path: %w", err)
}
Expand Down Expand Up @@ -62,7 +62,7 @@ func newSubWorkflow(

args := []string{
"start",
fmt.Sprintf("--requestID=%s", requestID),
fmt.Sprintf("--request-id=%s", requestID),
"--quiet",
subDAG.Location,
}
Expand All @@ -74,7 +74,7 @@ func newSubWorkflow(

cmd := exec.CommandContext(ctx, executable, args...)
if len(step.Dir) > 0 && !fileutil.FileExists(step.Dir) {
return nil, errWorkingDirNotExist
return nil, ErrWorkingDirNotExist
}
cmd.Dir = step.Dir
cmd.Env = append(cmd.Env, stepContext.AllEnvs()...)
Expand Down Expand Up @@ -152,3 +152,14 @@ func generateRequestID() (string, error) {
}
return id.String(), nil
}

func executablePath() (string, error) {
if os.Getenv("DAGU_EXECUTABLE") != "" {
return os.Getenv("DAGU_EXECUTABLE"), nil
}
executable, err := os.Executable()
if err != nil {
return "", fmt.Errorf("failed to get executable path: %w", err)
}
return executable, nil
}
9 changes: 8 additions & 1 deletion internal/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func TestIntegration(t *testing.T) {
"OUT1": "Starting server at localhost:8080",
},
},
{
name: "CallSubWorkflow",
dag: "call-sub.yaml",
expectedOutputs: map[string]any{
"OUT2": "foo",
},
},
{
name: "EnvVar",
dag: "environment-var.yaml",
Expand Down Expand Up @@ -141,7 +148,7 @@ func TestIntegration(t *testing.T) {
},
}

th := test.Setup(t)
th := test.Setup(t, test.WithDAGsDir(test.TestdataPath(t, "integration")))
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dag := th.DAG(t, filepath.Join("integration", tc.dag))
Expand Down
20 changes: 15 additions & 5 deletions internal/test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strings"
Expand All @@ -31,17 +32,13 @@ import (
)

var setupLock sync.Mutex
var executablePath string

func init() {
executablePath = filepath.Join(fileutil.MustGetwd(), "../../.local/bin/dagu")
}

// TestHelperOption defines functional options for Helper
type TestHelperOption func(*TestOptions)

type TestOptions struct {
CaptureLoggingOutput bool // CaptureLoggingOutput enables capturing of logging output
DAGsDir string
}

// WithCaptureLoggingOutput creates a logging capture option
Expand All @@ -51,6 +48,12 @@ func WithCaptureLoggingOutput() TestHelperOption {
}
}

func WithDAGsDir(dir string) TestHelperOption {
return func(opts *TestOptions) {
opts.DAGsDir = dir
}
}

// Setup creates a new Helper instance for testing
func Setup(t *testing.T, opts ...TestHelperOption) Helper {
setupLock.Lock()
Expand All @@ -65,11 +68,18 @@ func Setup(t *testing.T, opts ...TestHelperOption) Helper {
tmpDir := fileutil.MustTempDir(fmt.Sprintf("dagu-test-%s", random))
require.NoError(t, os.Setenv("DAGU_HOME", tmpDir))

root := getProjectRoot(t)
executablePath := path.Join(root, ".local", "bin", "dagu")
os.Setenv("DAGU_EXECUTABLE", executablePath)

cfg, err := config.Load()
require.NoError(t, err)

cfg.Paths.Executable = executablePath
cfg.Paths.LogDir = filepath.Join(tmpDir, "logs")
if options.DAGsDir != "" {
cfg.Paths.DAGsDir = options.DAGsDir
}

dagStore := local.NewDAGStore(cfg.Paths.DAGsDir)
historyStore := jsondb.New(cfg.Paths.DataDir)
Expand Down
9 changes: 9 additions & 0 deletions internal/testdata/integration/call-sub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
steps:
- name: step1
run: sub
params: "P1=foo"
output: OUT1
- name: step2
command: echo "${OUT1.outputs.OUT}"
output: OUT2
depends: [step1]
6 changes: 6 additions & 0 deletions internal/testdata/integration/sub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
params:
P1: xyz
steps:
- name: step1
command: echo $P1
output: OUT
Loading