Skip to content

Commit

Permalink
Merge pull request #202 from yohamta/fix/remove-expired-history-data
Browse files Browse the repository at this point in the history
Automatic deletion of history files after `histRetentionDays` period has expired
  • Loading branch information
yottahmd authored Jul 22, 2022
2 parents ffd713c + f8decf2 commit 5123c38
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 62 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ It will store log files in the `DAGU__LOGS` environment variable path. The defau

### How long will the history data be stored?

The default retention period for execution history is seven days. However, you can override the setting by the `histRetentionDays` field in a YAML file.
The default retention period for execution history is 30 days. However, you can override the setting by the `histRetentionDays` field in a YAML file.

### How can I retry a DAG from a specific task?

Expand Down
2 changes: 2 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func (a *Agent) setupDatabase() (err error) {
Config: database.DefaultConfig(),
}
a.dbWriter, a.dbFile, err = a.database.NewWriter(a.DAG.ConfigPath, time.Now(), a.requestId)
utils.LogErr("clean old history data",
a.database.RemoveOld(a.DAG.ConfigPath, a.DAG.HistRetentionDays))
return
}

Expand Down
96 changes: 40 additions & 56 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,26 @@ func TestMain(m *testing.M) {
}

func TestRunDAG(t *testing.T) {
_, dag := testDAGAsync(t, testConfig("agent_run.yaml"))
_, cfg := testDAGAsync(t, "agent_run.yaml")

time.Sleep(100 * time.Millisecond)

status, _ := controller.New(dag.Config).GetLastStatus()
status, _ := controller.New(cfg).GetLastStatus()
require.Equal(t, status.Status, scheduler.SchedulerStatus_Running)
require.Equal(t, status.Nodes[0].Status, scheduler.NodeStatus_Running)

require.Eventually(t, func() bool {
status, err := controller.New(dag.Config).GetLastStatus()
status, err := controller.New(cfg).GetLastStatus()
require.NoError(t, err)
return status.Status == scheduler.SchedulerStatus_Success
}, time.Second*2, time.Millisecond*100)
}

func TestCheckRunning(t *testing.T) {
config := testConfig("agent_is_running.yaml")
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(config, false)
require.NoError(t, err)
cfg := testLoadDAG(t, "agent_is_running.yaml")

a := &Agent{AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
}}

go func() {
Expand All @@ -64,21 +61,19 @@ func TestCheckRunning(t *testing.T) {
require.NotNil(t, status)
require.Equal(t, status.Status, scheduler.SchedulerStatus_Running)

_, err = testDAG(t, dag)
_, err := testDAG(t, cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "is already running")
}

func TestDryRun(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_dry.yaml"), false)
require.NoError(t, err)
cfg := testLoadDAG(t, "agent_dry.yaml")

a := &Agent{AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
Dry: true,
}}
err = a.Run()
err := a.Run()
require.NoError(t, err)

status := a.Status()
Expand All @@ -92,29 +87,26 @@ func TestCancelDAG(t *testing.T) {
func(a *Agent) { a.Signal(syscall.SIGTERM) },
func(a *Agent) { a.Cancel() },
} {
a, dag := testDAGAsync(t, testConfig("agent_sleep.yaml"))
a, cfg := testDAGAsync(t, "agent_sleep.yaml")
time.Sleep(time.Millisecond * 100)
abort(a)
time.Sleep(time.Millisecond * 500)
status, err := controller.New(dag.Config).GetLastStatus()
status, err := controller.New(cfg).GetLastStatus()
require.NoError(t, err)
require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status)
}
}

func TestPreConditionInvalid(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_multiple_steps.yaml"), false)
require.NoError(t, err)

dag.Config.Preconditions = []*config.Condition{
cfg := testLoadDAG(t, "agent_multiple_steps.yaml")
cfg.Preconditions = []*config.Condition{
{
Condition: "`echo 1`",
Expected: "0",
},
}

status, err := testDAG(t, dag)
status, err := testDAG(t, cfg)
require.Error(t, err)

require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status)
Expand All @@ -123,17 +115,15 @@ func TestPreConditionInvalid(t *testing.T) {
}

func TestPreConditionValid(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_with_params.yaml"), false)
require.NoError(t, err)
cfg := testLoadDAG(t, "agent_with_params.yaml")

dag.Config.Preconditions = []*config.Condition{
cfg.Preconditions = []*config.Condition{
{
Condition: "`echo 1`",
Expected: "1",
},
}
status, err := testDAG(t, dag)
status, err := testDAG(t, cfg)
require.NoError(t, err)

require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
Expand All @@ -143,20 +133,16 @@ func TestPreConditionValid(t *testing.T) {
}

func TestStartError(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_error.yaml"), false)
require.NoError(t, err)
status, err := testDAG(t, dag)
cfg := testLoadDAG(t, "agent_error.yaml")
status, err := testDAG(t, cfg)
require.Error(t, err)

require.Equal(t, scheduler.SchedulerStatus_Error, status.Status)
}

func TestOnExit(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_on_exit.yaml"), false)
require.NoError(t, err)
status, err := testDAG(t, dag)
cfg := testLoadDAG(t, "agent_on_exit.yaml")
status, err := testDAG(t, cfg)
require.NoError(t, err)

require.Equal(t, scheduler.SchedulerStatus_Success, status.Status)
Expand All @@ -167,12 +153,9 @@ func TestOnExit(t *testing.T) {
}

func TestRetry(t *testing.T) {
cfg := testConfig("agent_retry.yaml")
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(cfg, false)
require.NoError(t, err)
cfg := testLoadDAG(t, "agent_retry.yaml")

status, err := testDAG(t, dag)
status, err := testDAG(t, cfg)
require.Error(t, err)
require.Equal(t, scheduler.SchedulerStatus_Error, status.Status)

Expand All @@ -181,7 +164,7 @@ func TestRetry(t *testing.T) {
}
a := &Agent{
AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
},
RetryConfig: &RetryConfig{
Status: status,
Expand All @@ -201,12 +184,10 @@ func TestRetry(t *testing.T) {
}

func TestHandleHTTP(t *testing.T) {
dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(testConfig("agent_handle_http.yaml"), false)
require.NoError(t, err)
cfg := testLoadDAG(t, "agent_handle_http.yaml")

a := &Agent{AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
}}

go func() {
Expand Down Expand Up @@ -284,33 +265,36 @@ func (h *mockResponseWriter) WriteHeader(statusCode int) {
h.status = statusCode
}

func testDAG(t *testing.T, dag *controller.DAG) (*models.Status, error) {
func testDAG(t *testing.T, cfg *config.Config) (*models.Status, error) {
t.Helper()
a := &Agent{AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
}}
err := a.Run()
return a.Status(), err
}

func testConfig(name string) string {
return path.Join(testsDir, name)
func testLoadDAG(t *testing.T, name string) *config.Config {
file := path.Join(testsDir, name)
cl := &config.Loader{
HomeDir: utils.MustGetUserHomeDir(),
}
cfg, err := cl.Load(file, "")
require.NoError(t, err)
return cfg
}

func testDAGAsync(t *testing.T, file string) (*Agent, *controller.DAG) {
func testDAGAsync(t *testing.T, file string) (*Agent, *config.Config) {
t.Helper()

dr := controller.NewDAGReader()
dag, err := dr.ReadDAG(file, false)
require.NoError(t, err)

cfg := testLoadDAG(t, file)
a := &Agent{AgentConfig: &AgentConfig{
DAG: dag.Config,
DAG: cfg,
}}

go func() {
a.Run()
}()

return a, dag
return a, cfg
}
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *Config) setup() {
utils.ValidFilename(c.Name, "_"))
}
if c.HistRetentionDays == 0 {
c.HistRetentionDays = 7
c.HistRetentionDays = 30
}
if c.MaxCleanUpTime == 0 {
c.MaxCleanUpTime = time.Second * 60
Expand Down
5 changes: 3 additions & 2 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func (db *Database) FindByRequestId(configPath string, requestId string) (*model

// RemoveAll removes all files in a directory.
func (db *Database) RemoveAll(configPath string) {
db.RemoveOld(db.pattern(configPath)+"*.dat", 0)
db.RemoveOld(configPath, 0)
}

// RemoveOld removes old files.
func (db *Database) RemoveOld(pattern string, retentionDays int) error {
func (db *Database) RemoveOld(configPath string, retentionDays int) error {
pattern := db.pattern(configPath) + "*.dat"
var lastErr error = nil
if retentionDays >= 0 {
matches, _ := filepath.Glob(pattern)
Expand Down
2 changes: 1 addition & 1 deletion internal/database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func testRemoveOldFiles(t *testing.T, db *Database) {
files := db.latest(db.pattern(cfg.ConfigPath)+"*.dat", 3)
require.Equal(t, 3, len(files))

db.RemoveOld(db.pattern(cfg.ConfigPath)+"*.dat", 0)
db.RemoveOld(cfg.ConfigPath, 0)

files = db.latest(db.pattern(cfg.ConfigPath)+"*.dat", 3)
require.Equal(t, 0, len(files))
Expand Down
2 changes: 1 addition & 1 deletion internal/database/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func testWriteStatusToFile(t *testing.T, db *Database) {
require.NoError(t, dw.Open())
defer func() {
dw.Close()
db.RemoveOld(db.pattern(cfg.ConfigPath)+"*.dat", 0)
db.RemoveOld(cfg.ConfigPath, 0)
}()

status := models.NewStatus(cfg, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil)
Expand Down

0 comments on commit 5123c38

Please sign in to comment.