Skip to content

Commit

Permalink
Merge pull request #197 from yohamta/feat/suspend-dag
Browse files Browse the repository at this point in the history
feat: suspend DAG schedule by switches on web UI
  • Loading branch information
yottahmd authored Jul 12, 2022
2 parents 5180de0 + f77774a commit 3f9a3ef
Show file tree
Hide file tree
Showing 21 changed files with 353 additions and 53 deletions.
2 changes: 1 addition & 1 deletion admin/src/components/DAGCreationButton.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function DAGCreationButton({ refresh }: Props) {
body: formData,
});
if (resp.ok) {
refresh();
window.location.href = `/dags/${name.replace(/.yaml$/, '')}?t=1`;
} else {
const e = await resp.text();
alert(e);
Expand Down
47 changes: 47 additions & 0 deletions admin/src/components/DAGSwitch.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Switch } from '@mui/material';
import React from 'react';
import { DAG } from '../models/DAGData';

type Props = {
DAG: DAG;
refresh?: () => void;
};

function DAGSwitch({ DAG, refresh }: Props) {
const [checked, setChecked] = React.useState(!DAG.Suspended);

const onSubmit = React.useCallback(
async (params: { name: string; action: string; value: string }) => {
const form = new FormData();
form.set('action', params.action);
form.set('value', params.value);
const url = `${API_URL}/dags/${params.name}`;
const ret = await fetch(url, {
method: 'POST',
mode: 'cors',
body: form,
});
if (ret.ok) {
if (refresh) {
refresh();
}
} else {
const e = await ret.text();
alert(e);
}
},
[refresh]
);

const onChange = React.useCallback(() => {
const enabled = !checked;
setChecked(enabled);
onSubmit({
name: DAG.Config.Name,
action: 'suspend',
value: enabled ? 'false' : 'true',
});
}, [DAG, checked]);
return <Switch checked={checked} onChange={onChange} />;
}
export default DAGSwitch;
22 changes: 22 additions & 0 deletions admin/src/components/DAGTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
Chip,
IconButton,
Stack,
Switch,
Table,
TableBody,
TableCell,
Expand All @@ -37,6 +38,7 @@ import {
} from '../models/DAGData';
import StyledTableRow from './StyledTableRow';
import { KeyboardArrowDown, KeyboardArrowUp } from '@mui/icons-material';
import DAGSwitch from './DAGSwitch';

type Props = {
DAGs: DAGItem[];
Expand Down Expand Up @@ -271,6 +273,22 @@ const defaultColumns = [
return getNextSchedule(dataA.DAG) - getNextSchedule(dataB.DAG);
},
}),
table.createDataColumn('Type', {
id: 'On/Off',
header: 'On/Off',
cell: (props) => {
const data = props.row.original!;
if (data.Type != DAGDataType.DAG) {
return false;
}
return (
<DAGSwitch
DAG={data.DAG}
refresh={props.instance.options.meta?.refreshFn}
/>
);
},
}),
table.createDisplayColumn({
id: 'Actions',
header: 'Actions',
Expand Down Expand Up @@ -382,6 +400,10 @@ function DAGTable({ DAGs = [], group = '', refreshFn }: Props) {
},
});

React.useEffect(() => {
instance.toggleAllRowsExpanded(true);
}, []);

return (
<Box>
<Stack
Expand Down
2 changes: 1 addition & 1 deletion admin/src/components/layouts/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function Content({ title, navbarColor, children }: DashboardContentProps) {
<Drawer variant="permanent" open={open}>
<Box
sx={{
background: `linear-gradient(0deg, rgba(210,210,210,1) 0%, ${gradientColor} 70%, ${gradientColor} 100%);`,
background: `linear-gradient(0deg, #fff 0%, ${gradientColor} 70%, ${gradientColor} 100%);`,
height: '100%',
}}
>
Expand Down
1 change: 1 addition & 0 deletions admin/src/models/DAGData.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type DAG = {
Dir: string;
Config: Config;
Status?: Status;
Suspended: boolean;
ErrorT: string;
};

Expand Down
27 changes: 18 additions & 9 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestRunDAG(t *testing.T) {

func TestCheckRunning(t *testing.T) {
config := testConfig("agent_is_running.yaml")
dag, err := controller.NewDAG(config, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(config, false)
require.NoError(t, err)

a := &Agent{AgentConfig: &AgentConfig{
Expand All @@ -69,7 +70,8 @@ func TestCheckRunning(t *testing.T) {
}

func TestDryRun(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_dry.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_dry.yaml"), false)
require.NoError(t, err)

a := &Agent{AgentConfig: &AgentConfig{
Expand Down Expand Up @@ -101,7 +103,8 @@ func TestCancelDAG(t *testing.T) {
}

func TestPreConditionInvalid(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_multiple_steps.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_multiple_steps.yaml"), false)
require.NoError(t, err)

dag.Config.Preconditions = []*config.Condition{
Expand All @@ -120,7 +123,8 @@ func TestPreConditionInvalid(t *testing.T) {
}

func TestPreConditionValid(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_with_params.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_with_params.yaml"), false)
require.NoError(t, err)

dag.Config.Preconditions = []*config.Condition{
Expand All @@ -139,7 +143,8 @@ func TestPreConditionValid(t *testing.T) {
}

func TestStartError(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_error.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_error.yaml"), false)
require.NoError(t, err)
status, err := testDAG(t, dag)
require.Error(t, err)
Expand All @@ -148,7 +153,8 @@ func TestStartError(t *testing.T) {
}

func TestOnExit(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_on_exit.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_on_exit.yaml"), false)
require.NoError(t, err)
status, err := testDAG(t, dag)
require.NoError(t, err)
Expand All @@ -162,7 +168,8 @@ func TestOnExit(t *testing.T) {

func TestRetry(t *testing.T) {
cfg := testConfig("agent_retry.yaml")
dag, err := controller.NewDAG(cfg, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(cfg, false)
require.NoError(t, err)

status, err := testDAG(t, dag)
Expand Down Expand Up @@ -194,7 +201,8 @@ func TestRetry(t *testing.T) {
}

func TestHandleHTTP(t *testing.T) {
dag, err := controller.NewDAG(testConfig("agent_handle_http.yaml"), false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_handle_http.yaml"), false)
require.NoError(t, err)

a := &Agent{AgentConfig: &AgentConfig{
Expand Down Expand Up @@ -292,7 +300,8 @@ func testConfig(name string) string {
func testDAGAsync(t *testing.T, file string) (*Agent, *controller.DAG) {
t.Helper()

dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

a := &Agent{AgentConfig: &AgentConfig{
Expand Down
3 changes: 2 additions & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func Test_retryCommand(t *testing.T) {
output: []string{},
}, t)

dag, err := controller.NewDAG(configPath, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(configPath, false)
require.NoError(t, err)
require.Equal(t, dag.Status.Status, scheduler.SchedulerStatus_Success)

Expand Down
19 changes: 17 additions & 2 deletions internal/admin/handlers/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/yohamta/dagu/internal/database"
"github.com/yohamta/dagu/internal/models"
"github.com/yohamta/dagu/internal/scheduler"
"github.com/yohamta/dagu/internal/settings"
"github.com/yohamta/dagu/internal/storage"
"github.com/yohamta/dagu/internal/suspend"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/transform"
Expand Down Expand Up @@ -98,7 +101,8 @@ func HandleGetDAG(hc *DAGHandlerConfig) http.HandlerFunc {

params := getDAGParameter(r)
file := filepath.Join(hc.DAGsDir, fmt.Sprintf("%s.yaml", cfg))
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
if dag == nil {
encodeError(w, err)
return
Expand Down Expand Up @@ -172,7 +176,8 @@ func HandlePostDAG(hc *PostDAGHandlerConfig) http.HandlerFunc {
}

file := filepath.Join(hc.DAGsDir, fmt.Sprintf("%s.yaml", cfg))
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
if err != nil && action != "save" {
encodeError(w, err)
return
Expand All @@ -188,6 +193,16 @@ func HandlePostDAG(hc *PostDAGHandlerConfig) http.HandlerFunc {
}
c.StartAsync(hc.Bin, hc.WkDir, "")

case "suspend":
sc := suspend.NewSuspendChecker(
storage.NewStorage(
settings.MustGet(
settings.SETTING__SUSPEND_FLAGS_DIR,
),
),
)
sc.ToggleSuspend(dag.Config, value == "true")

case "stop":
if dag.Status.Status != scheduler.SchedulerStatus_Running {
w.WriteHeader(http.StatusBadRequest)
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ func GetDAGs(dir string) (dags []*DAG, errs []string, err error) {
}
fis, err := os.ReadDir(dir)
utils.LogErr("read DAGs directory", err)
dr := NewDAGReader()
for _, fi := range fis {
if utils.MatchExtension(fi.Name(), config.EXTENSIONS) {
dag, err := NewDAG(filepath.Join(dir, fi.Name()), true)
dag, err := dr.ReadDAG(filepath.Join(dir, fi.Name()), true)
utils.LogErr("read DAG config", err)
if dag != nil {
dags = append(dags, dag)
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func testConfig(name string) string {

func TestGetStatus(t *testing.T) {
file := testConfig("controller_success.yaml")
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

st, err := controller.New(dag.Config).GetStatus()
Expand All @@ -50,7 +51,8 @@ func TestGetStatus(t *testing.T) {
func TestGetStatusRunningAndDone(t *testing.T) {
file := testConfig("controller_status.yaml")

dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

socketServer, _ := sock.NewServer(
Expand Down Expand Up @@ -82,7 +84,8 @@ func TestGetStatusRunningAndDone(t *testing.T) {

func TestGetDAG(t *testing.T) {
file := testConfig("controller_get_dag.yaml")
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)
assert.Equal(t, "controller_get_dag", dag.Config.Name)
}
Expand All @@ -99,7 +102,8 @@ func TestGetDAGList(t *testing.T) {
func TestUpdateStatus(t *testing.T) {
file := testConfig("controller_update_status.yaml")

dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)
req := "test-update-status"
now := time.Now()
Expand Down Expand Up @@ -138,7 +142,8 @@ func TestUpdateStatus(t *testing.T) {
func TestUpdateStatusFailure(t *testing.T) {
file := testConfig("controller_update_status_failed.yaml")

dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)
req := "test-update-status-failure"

Expand Down Expand Up @@ -170,7 +175,8 @@ func TestUpdateStatusFailure(t *testing.T) {

func TestStart(t *testing.T) {
file := testConfig("controller_start_err.yaml")
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

c := controller.New(dag.Config)
Expand All @@ -184,7 +190,8 @@ func TestStart(t *testing.T) {

func TestStartStop(t *testing.T) {
file := testConfig("controller_start_stop.yaml")
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

c := controller.New(dag.Config)
Expand All @@ -205,7 +212,8 @@ func TestStartStop(t *testing.T) {

func TestRetry(t *testing.T) {
file := testConfig("controller_retry.yaml")
dag, err := controller.NewDAG(file, false)
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

c := controller.New(dag.Config)
Expand Down
Loading

0 comments on commit 3f9a3ef

Please sign in to comment.