Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #382 #384

Merged
merged 3 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/dag/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package dag
import (
"fmt"
"strings"
"sync"
"time"

"github.com/yohamta/dagu/internal/utils"
)

// Step represents a step in a DAG.
type Step struct {
Name string
Description string
Variables []string
OutputVariables *sync.Map
OutputVariables *utils.SyncMap
Dir string
ExecutorConfig ExecutorConfig
CmdWithArgs string
Expand Down
14 changes: 14 additions & 0 deletions internal/models/status_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package models

import (
"encoding/json"
"testing"
"time"

"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/scheduler"

"github.com/stretchr/testify/require"
"github.com/yohamta/dagu/internal/utils"
)

func TestPid(t *testing.T) {
Expand Down Expand Up @@ -70,3 +72,15 @@ func TestCorrectRunningStatus(t *testing.T) {
status.CorrectRunningStatus()
require.Equal(t, scheduler.SchedulerStatus_Error, status.Status)
}

func TestJsonMarshal(t *testing.T) {
step := dag.Step{
OutputVariables: &utils.SyncMap{},
}
step.OutputVariables.Store("A", "B")
js, err := json.Marshal(step)
if err != nil {
t.Fatalf(err.Error())
}
t.Logf(string(js))
}
21 changes: 17 additions & 4 deletions internal/scheduler/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package scheduler
import (
"fmt"
"log"
"sync"
"os"
"time"

"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/utils"
)

// ExecutionGraph represents a graph of steps.
type ExecutionGraph struct {
StartedAt time.Time
FinishedAt time.Time
outputVariables *sync.Map
outputVariables *utils.SyncMap
dict map[int]*Node
nodes []*Node
from map[int][]int
Expand All @@ -23,7 +24,7 @@ type ExecutionGraph struct {
// NewExecutionGraph creates a new execution graph with the given steps.
func NewExecutionGraph(steps ...*dag.Step) (*ExecutionGraph, error) {
graph := &ExecutionGraph{
outputVariables: &sync.Map{},
outputVariables: &utils.SyncMap{},
dict: make(map[int]*Node),
from: make(map[int][]int),
to: make(map[int][]int),
Expand All @@ -45,13 +46,25 @@ func NewExecutionGraph(steps ...*dag.Step) (*ExecutionGraph, error) {
// NewExecutionGraphForRetry creates a new execution graph for retry with given nodes.
func NewExecutionGraphForRetry(nodes ...*Node) (*ExecutionGraph, error) {
graph := &ExecutionGraph{
outputVariables: &sync.Map{},
outputVariables: &utils.SyncMap{},
dict: make(map[int]*Node),
from: make(map[int][]int),
to: make(map[int][]int),
nodes: []*Node{},
}
for _, node := range nodes {
if node.OutputVariables != nil {
node.OutputVariables.Range(func(key, value interface{}) bool {
k := key.(string)
v := value.(string)
graph.outputVariables.Store(key, value)
err := os.Setenv(k, v[len(key.(string))+1:])
if err != nil {
log.Printf("set env error : %s", err.Error())
}
return true
})
}
node.OutputVariables = graph.outputVariables
node.init()
graph.dict[node.id] = node
Expand Down
32 changes: 16 additions & 16 deletions internal/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ import (
"math/rand"
"os"
"path"
"sync"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/yohamta/dagu/internal/dag"
"github.com/yohamta/dagu/internal/utils"
)

func TestExecute(t *testing.T) {
n := &Node{
Step: &dag.Step{
Command: "true",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
}}
require.NoError(t, n.Execute())
require.Nil(t, n.Error)
Expand All @@ -28,7 +28,7 @@ func TestError(t *testing.T) {
n := &Node{
Step: &dag.Step{
Command: "false",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
}}
err := n.Execute()
require.True(t, err != nil)
Expand All @@ -40,7 +40,7 @@ func TestSignal(t *testing.T) {
Step: &dag.Step{
Command: "sleep",
Args: []string{"100"},
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
}}

go func() {
Expand All @@ -60,7 +60,7 @@ func TestSignalSpecified(t *testing.T) {
Step: &dag.Step{
Command: "sleep",
Args: []string{"100"},
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
SignalOnStop: "SIGINT",
}}

Expand All @@ -82,7 +82,7 @@ func TestLog(t *testing.T) {
Command: "echo",
Args: []string{"done"},
Dir: os.Getenv("HOME"),
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand All @@ -99,7 +99,7 @@ func TestStdout(t *testing.T) {
Args: []string{"done"},
Dir: os.Getenv("HOME"),
Stdout: "stdout.log",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand All @@ -121,7 +121,7 @@ echo Stderr message >&2
Dir: os.Getenv("HOME"),
Stdout: "test-stderr-stdout.log",
Stderr: "test-stderr-stderr.log",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand All @@ -141,7 +141,7 @@ func TestNode(t *testing.T) {
Step: &dag.Step{
Command: "echo",
Args: []string{"hello"},
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}
n.incDoneCount()
Expand All @@ -164,7 +164,7 @@ func TestOutput(t *testing.T) {
Step: &dag.Step{
CmdWithArgs: "echo hello",
Output: "OUTPUT_TEST",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}
err := n.setup(os.Getenv("HOME"), "test-request-id-output")
Expand All @@ -184,7 +184,7 @@ func TestOutput(t *testing.T) {
Step: &dag.Step{
CmdWithArgs: "echo $OUTPUT_TEST",
Output: "OUTPUT_TEST2",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand All @@ -197,7 +197,7 @@ func TestOutput(t *testing.T) {
Command: "sh",
Script: "echo $OUTPUT_TEST2",
Output: "OUTPUT_TEST3",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestOutputJson(t *testing.T) {
Step: &dag.Step{
CmdWithArgs: test.CmdWithArgs,
Output: "OUTPUT_JSON_TEST",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}
err := n.setup(os.Getenv("HOME"), fmt.Sprintf("test-output-json-%d", i))
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestOutputSpecialchar(t *testing.T) {
Step: &dag.Step{
CmdWithArgs: test.CmdWithArgs,
Output: "OUTPUT_SPECIALCHAR_TEST",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}
err := n.setup(os.Getenv("HOME"), fmt.Sprintf("test-output-specialchar-%d", i))
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestRunScript(t *testing.T) {
echo hello
`,
Output: "SCRIPT_TEST",
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand All @@ -335,7 +335,7 @@ func TestTeardown(t *testing.T) {
Step: &dag.Step{
Command: testCommand,
Args: []string{},
OutputVariables: &sync.Map{},
OutputVariables: &utils.SyncMap{},
},
}

Expand Down
26 changes: 26 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package utils

import (
"encoding/json"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/mattn/go-shellwords"
Expand Down Expand Up @@ -237,3 +239,27 @@ func Now() time.Time {
}
return FixedTime
}

type SyncMap struct {
sync.Map
}

func (m *SyncMap) MarshalJSON() ([]byte, error) {
tmpMap := make(map[string]interface{})
m.Range(func(k, v interface{}) bool {
tmpMap[k.(string)] = v
return true
})
return json.Marshal(tmpMap)
}

func (m *SyncMap) UnmarshalJSON(data []byte) error {
var tmpMap map[string]interface{}
if err := json.Unmarshal(data, &tmpMap); err != nil {
return err
}
for key, value := range tmpMap {
m.Store(key, value)
}
return nil
}