Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kv): Install feature-flag for switching between normal and simplified options parsing #18662

Merged
merged 1 commit into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,10 @@
default: false
contact: Lyon Hill
lifetime: temporary

- name: Simple Task Options Extraction
description: Simplified task options extraction to avoid undefined functions when saving tasks
key: simpleTaskOptionsExtraction
default: false
contact: Brett Buddin
lifetime: temporary
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 46 additions & 12 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kv
import (
"context"
"encoding/json"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -674,7 +675,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
// return nil, influxdb.ErrInvalidOwnerID
// }

opt, err := options.FromScript(s.FluxLanguageService, tc.Flux)
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, tc.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
Expand All @@ -691,19 +692,19 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
Organization: org.Name,
OwnerID: tc.OwnerID,
Metadata: tc.Metadata,
Name: opt.Name,
Name: opts.Name,
Description: tc.Description,
Status: tc.Status,
Flux: tc.Flux,
Every: opt.Every.String(),
Cron: opt.Cron,
Every: opts.Every.String(),
Cron: opts.Cron,
CreatedAt: createdAt,
LatestCompleted: createdAt,
LatestScheduled: createdAt,
}

if opt.Offset != nil {
off, err := time.ParseDuration(opt.Offset.String())
if opts.Offset != nil {
off, err := time.ParseDuration(opts.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
Expand Down Expand Up @@ -830,17 +831,17 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
task.Flux = *upd.Flux

options, err := options.FromScript(s.FluxLanguageService, *upd.Flux)
opts, err := ExtractTaskOptions(ctx, s.FluxLanguageService, *upd.Flux)
if err != nil {
return nil, influxdb.ErrTaskOptionParse(err)
}
task.Name = options.Name
task.Every = options.Every.String()
task.Cron = options.Cron
task.Name = opts.Name
task.Every = opts.Every.String()
task.Cron = opts.Cron

var off time.Duration
if options.Offset != nil {
off, err = time.ParseDuration(options.Offset.String())
if opts.Offset != nil {
off, err = time.ParseDuration(opts.Offset.String())
if err != nil {
return nil, influxdb.ErrTaskTimeParse(err)
}
Expand Down Expand Up @@ -1919,3 +1920,36 @@ func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error
}
return nil
}

var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)

// ExtractTaskOptions is a feature-flag driven switch between normal options
// parsing and a more simplified variant.
//
// The simplified variant extracts the options assignment and passes only that
// content through the parser. This allows us to allow scenarios like [1] to
// pass through options validation. One clear drawback of this is that it
// requires constant values for the parameter assignments. However, most people
// are doing that anyway.
//
// [1]: https://github.com/influxdata/influxdb/issues/17666
func ExtractTaskOptions(ctx context.Context, lang influxdb.FluxLanguageService, flux string) (options.Options, error) {
if !feature.SimpleTaskOptionsExtraction().Enabled(ctx) {
return options.FromScript(lang, flux)
}

matches := taskOptionsPattern.FindAllString(flux, -1)
if len(matches) == 0 {
return options.Options{}, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "no task options defined",
}
}
if len(matches) > 1 {
return options.Options{}, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "multiple task options defined",
}
}
return options.FromScript(lang, matches[0])
}
105 changes: 105 additions & 0 deletions kv/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/options"
"github.com/influxdata/influxdb/v2/task/servicetest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

Expand Down Expand Up @@ -387,3 +392,103 @@ func TestTaskMigrate(t *testing.T) {
t.Fatal("failed to fill in ownerID")
}
}

type taskOptions struct {
name string
every string
cron string
offset string
concurrency int64
retry int64
}

func TestExtractTaskOptions(t *testing.T) {
tcs := []struct {
name string
flux string
expected taskOptions
errMsg string
}{
{
name: "all parameters",
flux: `option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}`,
expected: taskOptions{
name: "whatever",
every: "1s",
offset: "0s",
concurrency: 2,
retry: 2,
},
},
{
name: "some extra whitespace and bad content around it",
flux: `howdy()
option task = { name:"whatever", cron: "* * * * *" }
hello()
`,
expected: taskOptions{
name: "whatever",
cron: "* * * * *",
concurrency: 1,
retry: 1,
},
},
{
name: "bad options",
flux: `option task = {name: "whatever", every: 1s, cron: "* * * * *"}`,
errMsg: "cannot use both cron and every in task options",
},
{
name: "no options",
flux: `doesntexist()`,
errMsg: "no task options defined",
},
{
name: "multiple assignments",
flux: `
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
option task = {name: "whatever", every: 1s, offset: 0s, concurrency: 2, retry: 2}
`,
errMsg: "multiple task options defined",
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.SimpleTaskOptionsExtraction(): true,
})
ctx, _ := feature.Annotate(context.Background(), flagger)
opts, err := kv.ExtractTaskOptions(ctx, fluxlang.DefaultService, tc.flux)
if tc.errMsg != "" {
require.Error(t, err)
assert.Equal(t, tc.errMsg, err.Error())
return
}

require.NoError(t, err)

var offset options.Duration
if opts.Offset != nil {
offset = *opts.Offset
}

var concur int64
if opts.Concurrency != nil {
concur = *opts.Concurrency
}

var retry int64
if opts.Retry != nil {
retry = *opts.Retry
}

assert.Equal(t, tc.expected.name, opts.Name)
assert.Equal(t, tc.expected.cron, opts.Cron)
assert.Equal(t, tc.expected.every, opts.Every.String())
assert.Equal(t, tc.expected.offset, offset.String())
assert.Equal(t, tc.expected.concurrency, concur)
assert.Equal(t, tc.expected.retry, retry)
})
}
}