Skip to content

Commit

Permalink
fix(kv): Install feature-flag for switching between normal and simpli…
Browse files Browse the repository at this point in the history
…fied options parsing. (#18662)
  • Loading branch information
brettbuddin authored Jun 23, 2020
1 parent a96f21f commit 81e4b02
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 12 deletions.
7 changes: 7 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,10 @@
default: false
contact: Lyon Hill
lifetime: temporary

- name: Simple Task Options Extraction
description: Simplified task options extraction to avoid undefined functions when saving tasks
key: simpleTaskOptionsExtraction
default: false
contact: Brett Buddin
lifetime: temporary
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 46 additions & 12 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kv
import (
"context"
"encoding/json"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -674,7 +675,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
// return nil, influxdb.ErrInvalidOwnerID
// }

opt, err := options.FromScript(s.FluxLanguageService, tc.Flux)
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, tc.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
Expand All @@ -691,19 +692,19 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
Organization: org.Name,
OwnerID: tc.OwnerID,
Metadata: tc.Metadata,
Name: opt.Name,
Name: opts.Name,
Description: tc.Description,
Status: tc.Status,
Flux: tc.Flux,
Every: opt.Every.String(),
Cron: opt.Cron,
Every: opts.Every.String(),
Cron: opts.Cron,
CreatedAt: createdAt,
LatestCompleted: createdAt,
LatestScheduled: createdAt,
}

if opt.Offset != nil {
off, err := time.ParseDuration(opt.Offset.String())
if opts.Offset != nil {
off, err := time.ParseDuration(opts.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
Expand Down Expand Up @@ -830,17 +831,17 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
task.Flux = *upd.Flux

options, err := options.FromScript(s.FluxLanguageService, *upd.Flux)
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, *upd.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
task.Name = options.Name
task.Every = options.Every.String()
task.Cron = options.Cron
task.Name = opts.Name
task.Every = opts.Every.String()
task.Cron = opts.Cron

var off time.Duration
if options.Offset != nil {
off, err = time.ParseDuration(options.Offset.String())
if opts.Offset != nil {
off, err = time.ParseDuration(opts.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
Expand Down Expand Up @@ -1919,3 +1920,36 @@ func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error
}
return nil
}

var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)

// ExtractTaskOptions is a feature-flag driven switch between normal options
// parsing and a more simplified variant.
//
// The simplified variant extracts the options assignment and passes only that
// content through the parser. This allows us to allow scenarios like [1] to
// pass through options validation. One clear drawback of this is that it
// requires constant values for the parameter assignments. However, most people
// are doing that anyway.
//
// [1]: https://github.com/influxdata/influxdb/issues/17666
func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) {
if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
return options.FromScript(lang, flux)
}

matches := taskOptionsPattern.FindAllString(flux, -1)
if len(matches) == 0 {
return options.Options{}, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "no task options defined",
}
}
if len(matches) > 1 {
return options.Options{}, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "multiple task options defined",
}
}
return options.FromScript(lang, matches[0])
}
105 changes: 105 additions & 0 deletions kv/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/options"
"github.com/influxdata/influxdb/v2/task/servicetest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -387,3 +392,103 @@ func TestTaskMigrate(t *testing.T) {
t.Fatal("failed to fill in ownerID")
}
}

type taskOptions struct {
name string
every string
cron string
offset string
concurrency int64
retry int64
}

func TestExtractTaskOptions(t *testing.T) {
tcs := []struct {
name string
flux string
expected taskOptions
errMsg string
}{
{
name: "all parameters",
flux: `option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}`,
expected: taskOptions{
name: "whatever",
every: "1s",
offset: "0s",
concurrency: 2,
retry: 2,
},
},
{
name: "some extra whitespace and bad content around it",
flux: `howdy()
option task = { name:"whatever", cron: "* * * * *" }
hello()
`,
expected: taskOptions{
name: "whatever",
cron: "* * * * *",
concurrency: 1,
retry: 1,
},
},
{
name: "bad options",
flux: `option task = {name: "whatever", every: 1s, cron: "* * * * *"}`,
errMsg: "cannot use both cron and every in task options",
},
{
name: "no options",
flux: `doesntexist()`,
errMsg: "no task options defined",
},
{
name: "multiple assignments",
flux: `
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
`,
errMsg: "multiple task options defined",
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.SimpleTaskOptionsExtraction(): true,
})
ctx, _ := feature.Annotate(context.Background(), flagger)
opts, err := kv.ExtractTaskOptions(ctx, fluxlang.DefaultService, tc.flux)
if tc.errMsg != "" {
require.Error(t, err)
assert.Equal(t, tc.errMsg, err.Error())
return
}

require.NoError(t, err)

var offset options.Duration
if opts.Offset != nil {
offset = *opts.Offset
}

var concur int64
if opts.Concurrency != nil {
concur = *opts.Concurrency
}

var retry int64
if opts.Retry != nil {
retry = *opts.Retry
}

assert.Equal(t, tc.expected.name, opts.Name)
assert.Equal(t, tc.expected.cron, opts.Cron)
assert.Equal(t, tc.expected.every, opts.Every.String())
assert.Equal(t, tc.expected.offset, offset.String())
assert.Equal(t, tc.expected.concurrency, concur)
assert.Equal(t, tc.expected.retry, retry)
})
}
}

0 comments on commit 81e4b02

Please sign in to comment.