Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(influxd): flag to disable scheduling of tasks #17462

Merged
merged 2 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
1. [17400](https://github.com/influxdata/influxdb/pull/17400): Be able to delete bucket by name via cli
1. [17396](https://github.com/influxdata/influxdb/pull/17396): Add module to write line data to specified url, org, and bucket
1. [17448](https://github.com/influxdata/influxdb/pull/17448): Add foundation for pkger stacks, stateful package management
1. [17462](https://github.com/influxdata/influxdb/pull/17462): Flag to disable scheduling of tasks

### Bug Fixes

Expand Down
51 changes: 36 additions & 15 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
Default: true,
Desc: "toggles read-only mode for the new meta store, if so, the reads are duplicated between the old and new store (has meaning only if the new meta store is enabled)",
},
{
DestP: &l.noTasks,
Flag: "no-tasks",
Default: false,
Desc: "disables the task scheduler",
},
}

cli.BindOptions(cmd, opts)
Expand Down Expand Up @@ -319,7 +325,8 @@ type Launcher struct {
natsServer *nats.Server
natsPort int

scheduler *scheduler.TreeScheduler
noTasks bool
scheduler stoppingScheduler
executor *executor.Executor
taskControlService taskbackend.TaskControlService

Expand All @@ -333,6 +340,11 @@ type Launcher struct {
apibackend *http.APIBackend
}

type stoppingScheduler interface {
scheduler.Scheduler
Stop()
}

// NewLauncher returns a new instance of Launcher connected to standard in/out/err.
func NewLauncher() *Launcher {
return &Launcher{
Expand Down Expand Up @@ -682,22 +694,31 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
schLogger := m.log.With(zap.String("service", "task-scheduler"))

sch, sm, err := scheduler.NewScheduler(
executor,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) {
schLogger.Info(
"error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledAt", scheduledAt),
zap.Error(err))
}),
)
if err != nil {
m.log.Fatal("could not start task scheduler", zap.Error(err))
var sch stoppingScheduler = &scheduler.NoopScheduler{}
if !m.noTasks {
var (
sm *scheduler.SchedulerMetrics
err error
)
sch, sm, err = scheduler.NewScheduler(
executor,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) {
schLogger.Info(
"error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledAt", scheduledAt),
zap.Error(err))
}),
)
if err != nil {
m.log.Fatal("could not start task scheduler", zap.Error(err))
}
m.reg.MustRegister(sm.PrometheusCollectors()...)
}

m.scheduler = sch
m.reg.MustRegister(sm.PrometheusCollectors()...)

coordLogger := m.log.With(zap.String("service", "task-coordinator"))
taskCoord := coordinator.NewCoordinator(
coordLogger,
Expand Down
18 changes: 18 additions & 0 deletions task/backend/scheduler/noop_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package scheduler

// NoopScheduler is a no-op scheduler. It is used when we don't want the
// standard scheduler to run (e.g. when "--no-tasks" flag is present).
type NoopScheduler struct{}

// Schedule is a mocked Scheduler.Schedule method.
func (n *NoopScheduler) Schedule(task Schedulable) error {
return nil
}

// Release is a mocked Scheduler.Release method.
func (n *NoopScheduler) Release(taskID ID) error {
return nil
}

// Stop is a mocked stop method.
func (n *NoopScheduler) Stop() {}