Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(task): Record last success and failure run times in the Task #19390

Merged
merged 5 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
1. [19246](https://github.com/influxdata/influxdb/pull/19246): Redesign load data page to increase discovery and ease of use
1. [19334](https://github.com/influxdata/influxdb/pull/19334): Add --active-config flag to influx to set config for single command
1. [19219](https://github.com/influxdata/influxdb/pull/19219): List buckets via the API now supports after (ID) parameter as an alternative to offset.
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task

### Bug Fixes

Expand Down
35 changes: 35 additions & 0 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type kvTask struct {
Offset influxdb.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LatestSuccess time.Time `json:"latestSuccess,omitempty"`
LatestFailure time.Time `json:"latestFailure,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Expand All @@ -74,6 +76,8 @@ func kvToInfluxTask(k *kvTask) *influxdb.Task {
Offset: k.Offset.Duration,
LatestCompleted: k.LatestCompleted,
LatestScheduled: k.LatestScheduled,
LatestSuccess: k.LatestSuccess,
LatestFailure: k.LatestFailure,
CreatedAt: k.CreatedAt,
UpdatedAt: k.UpdatedAt,
Metadata: k.Metadata,
Expand Down Expand Up @@ -770,6 +774,26 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
}

if upd.LatestSuccess != nil {
// make sure we only update latest success one way
tlc := task.LatestSuccess
ulc := *upd.LatestSuccess

if !ulc.IsZero() && ulc.After(tlc) {
task.LatestSuccess = *upd.LatestSuccess
}
}

if upd.LatestFailure != nil {
// make sure we only update latest failure one way
tlc := task.LatestFailure
ulc := *upd.LatestFailure

if !ulc.IsZero() && ulc.After(tlc) {
task.LatestFailure = *upd.LatestFailure
}
}

if upd.LastRunStatus != nil {
task.LastRunStatus = *upd.LastRunStatus
if *upd.LastRunStatus == "failed" && upd.LastRunError != nil {
Expand Down Expand Up @@ -1480,8 +1504,19 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I

// tell task to update latest completed
scheduled := r.ScheduledFor

var latestSuccess, latestFailure *time.Time

if r.Status == "failed" {
latestFailure = &scheduled
} else {
latestSuccess = &scheduled
}

_, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{
LatestCompleted: &scheduled,
LatestSuccess: latestSuccess,
LatestFailure: latestFailure,
LastRunStatus: &r.Status,
LastRunError: func() *string {
if r.Status == "failed" {
Expand Down
79 changes: 79 additions & 0 deletions kv/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,85 @@ func TestTaskRunCancellation(t *testing.T) {
}
}

func TestService_UpdateTask_RecordLatestSuccessAndFailure(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

c := clock.NewMock()
c.Set(time.Unix(1000, 0))

ts := newService(t, ctx, c)
defer ts.Close()

ctx = icontext.SetAuthorizer(ctx, &ts.Auth)

originalTask, err := ts.Service.CreateTask(ctx, influxdb.TaskCreate{
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: ts.Org.ID,
OwnerID: ts.User.ID,
Status: string(influxdb.TaskActive),
})
if err != nil {
t.Fatal("CreateTask", err)
}

c.Add(1 * time.Second)
exp := c.Now()
updatedTask, err := ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{
LatestCompleted: &exp,
LatestScheduled: &exp,

// These would be updated in a mutually exclusive manner, but we'll set
// them both to demonstrate that they do change.
LatestSuccess: &exp,
LatestFailure: &exp,
})
if err != nil {
t.Fatal("UpdateTask", err)
}

if got := updatedTask.LatestScheduled; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestCompleted; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestSuccess; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestFailure; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}

c.Add(5 * time.Second)
exp = c.Now()
updatedTask, err = ts.Service.UpdateTask(ctx, originalTask.ID, influxdb.TaskUpdate{
LatestCompleted: &exp,
LatestScheduled: &exp,

// These would be updated in a mutually exclusive manner, but we'll set
// them both to demonstrate that they do change.
LatestSuccess: &exp,
LatestFailure: &exp,
})
if err != nil {
t.Fatal("UpdateTask", err)
}

if got := updatedTask.LatestScheduled; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestCompleted; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestSuccess; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
if got := updatedTask.LatestFailure; !got.Equal(exp) {
t.Fatalf("unexpected -got/+exp\n%s", cmp.Diff(got.String(), exp.String()))
}
}

type taskOptions struct {
name string
every string
Expand Down
4 changes: 4 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Task struct {
Offset time.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LatestSuccess time.Time `json:"latestSuccess,omitempty"`
LatestFailure time.Time `json:"latestFailure,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
Expand Down Expand Up @@ -183,6 +185,8 @@ type TaskUpdate struct {
// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *time.Time `json:"-"`
LatestScheduled *time.Time `json:"-"`
LatestSuccess *time.Time `json:"-"`
LatestFailure *time.Time `json:"-"`
LastRunStatus *string `json:"-"`
LastRunError *string `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
Expand Down