From 3d826d96cf03f0a82c5fead55860f7e480d46649 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Thu, 26 Mar 2020 13:37:25 -0400 Subject: [PATCH 1/2] feat(influxd): flag to disable scheduling of tasks --- cmd/influxd/launcher/launcher.go | 51 +++++++++++++++++------- task/backend/scheduler/noop_scheduler.go | 18 +++++++++ 2 files changed, 54 insertions(+), 15 deletions(-) create mode 100644 task/backend/scheduler/noop_scheduler.go diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index a446d7dd8ad..45c26e81f83 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -273,6 +273,12 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) { Default: true, Desc: "toggles read-only mode for the new meta store, if so, the reads are duplicated between the old and new store (has meaning only if the new meta store is enabled)", }, + { + DestP: &l.noTasks, + Flag: "no-tasks", + Default: false, + Desc: "disables the task scheduler", + }, } cli.BindOptions(cmd, opts) @@ -319,7 +325,8 @@ type Launcher struct { natsServer *nats.Server natsPort int - scheduler *scheduler.TreeScheduler + noTasks bool + scheduler stoppingScheduler executor *executor.Executor taskControlService taskbackend.TaskControlService @@ -333,6 +340,11 @@ type Launcher struct { apibackend *http.APIBackend } +type stoppingScheduler interface { + scheduler.Scheduler + Stop() +} + // NewLauncher returns a new instance of Launcher connected to standard in/out/err. func NewLauncher() *Launcher { return &Launcher{ @@ -682,22 +694,31 @@ func (m *Launcher) run(ctx context.Context) (err error) { m.reg.MustRegister(executorMetrics.PrometheusCollectors()...) schLogger := m.log.With(zap.String("service", "task-scheduler")) - sch, sm, err := scheduler.NewScheduler( - executor, - taskbackend.NewSchedulableTaskService(m.kvService), - scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { - schLogger.Info( - "error in scheduler run", - zap.String("taskID", platform.ID(taskID).String()), - zap.Time("scheduledAt", scheduledAt), - zap.Error(err)) - }), - ) - if err != nil { - m.log.Fatal("could not start task scheduler", zap.Error(err)) + var sch stoppingScheduler = &scheduler.NoopScheduler{} + if !m.noTasks { + var ( + sm *scheduler.SchedulerMetrics + err error + ) + sch, sm, err = scheduler.NewScheduler( + executor, + taskbackend.NewSchedulableTaskService(m.kvService), + scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { + schLogger.Info( + "error in scheduler run", + zap.String("taskID", platform.ID(taskID).String()), + zap.Time("scheduledAt", scheduledAt), + zap.Error(err)) + }), + ) + if err != nil { + m.log.Fatal("could not start task scheduler", zap.Error(err)) + } + m.reg.MustRegister(sm.PrometheusCollectors()...) } + m.scheduler = sch - m.reg.MustRegister(sm.PrometheusCollectors()...) + coordLogger := m.log.With(zap.String("service", "task-coordinator")) taskCoord := coordinator.NewCoordinator( coordLogger, diff --git a/task/backend/scheduler/noop_scheduler.go b/task/backend/scheduler/noop_scheduler.go new file mode 100644 index 00000000000..b83b1bb2f5d --- /dev/null +++ b/task/backend/scheduler/noop_scheduler.go @@ -0,0 +1,18 @@ +package scheduler + +// NoopScheduler is a no-op scheduler. It is used when we don't want the +// standard scheduler to run (e.g. when "--no-tasks" flag is present). +type NoopScheduler struct{} + +// Schedule is a mocked Scheduler.Schedule method. +func (n *NoopScheduler) Schedule(task Schedulable) error { + return nil +} + +// Release is a mocked Scheduler.Release method. +func (n *NoopScheduler) Release(taskID ID) error { + return nil +} + +// Stop is a mocked stop method. +func (n *NoopScheduler) Stop() {} From 19abbebef68a258f6ce9c9bbc00a28760039c6b6 Mon Sep 17 00:00:00 2001 From: Brett Buddin Date: Fri, 27 Mar 2020 12:41:25 -0400 Subject: [PATCH 2/2] chore(influxd): update changelog for no-tasks flag. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5681c708e5f..486d9be48a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ 1. [17400](https://github.com/influxdata/influxdb/pull/17400): Be able to delete bucket by name via cli 1. [17396](https://github.com/influxdata/influxdb/pull/17396): Add module to write line data to specified url, org, and bucket 1. [17448](https://github.com/influxdata/influxdb/pull/17448): Add foundation for pkger stacks, stateful package management +1. [17462](https://github.com/influxdata/influxdb/pull/17462): Flag to disable scheduling of tasks ### Bug Fixes