Skip to content

Commit

Permalink
Merge pull request #17384 from influxdata/bb/reuse-slices
Browse files Browse the repository at this point in the history
fix(task/scheduler): Reuse slices built by iterator to reduce allocations
  • Loading branch information
brettbuddin authored Mar 24, 2020
2 parents 4f65cda + d977758 commit 5482cc3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
1. [17240](https://github.com/influxdata/influxdb/pull/17240): NodeJS logo displays properly in Firefox
1. [17363](https://github.com/influxdata/influxdb/pull/17363): Fixed telegraf configuration bugs where system buckets were appearing in the buckets dropdown
1. [17391](https://github.com/influxdata/influxdb/pull/17391): Fixed threshold check bug where checks could not be created when a field had a space in the name
1. [17384](https://github.com/influxdata/influxdb/pull/17384): Reuse slices built by iterator to reduce allocations

### UI Improvements

Expand Down
18 changes: 12 additions & 6 deletions task/backend/scheduler/treescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type TreeScheduler struct {
workchans []chan Item
wg sync.WaitGroup
checkpointer SchedulableService
items *itemList

sm *SchedulerMetrics
}
Expand Down Expand Up @@ -118,6 +119,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
time: clock.New(),
done: make(chan struct{}, 1),
checkpointer: checkpointer,
items: &itemList{},
}

// apply options
Expand Down Expand Up @@ -215,7 +217,12 @@ type itemList struct {
}

func (s *TreeScheduler) process() {
iter, toReAdd := s.iterator(s.time.Now())
// Reset the length of the slice in preparation of the next iterator.
s.items.toDelete = s.items.toDelete[:0]
s.items.toInsert = s.items.toInsert[:0]

toReAdd := s.items
iter := s.iterator(s.time.Now())
s.priorityQueue.Ascend(iter)
for i := range toReAdd.toDelete {
delete(s.nextTime, toReAdd.toDelete[i].id)
Expand All @@ -232,8 +239,7 @@ func (s *TreeScheduler) resetTimer(whenFromNow time.Duration) {
s.timer.Reset(whenFromNow)
}

func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *itemList) {
itemsToPlace := &itemList{}
func (s *TreeScheduler) iterator(ts time.Time) btree.ItemIterator {
return func(i btree.Item) bool {
if i == nil {
return false
Expand All @@ -249,13 +255,13 @@ func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *itemList) {
wc := xxhash.Sum64(buf[:]) % uint64(len(s.workchans)) // we just hash so that the number is uniformly distributed
select {
case s.workchans[wc] <- it:
itemsToPlace.toDelete = append(itemsToPlace.toDelete, it)
s.items.toDelete = append(s.items.toDelete, it)
if err := it.updateNext(); err != nil {
// in this error case we can't schedule next, so we have to drop the task
s.onErr(context.Background(), it.id, it.Next(), &ErrUnrecoverable{err})
return true
}
itemsToPlace.toInsert = append(itemsToPlace.toInsert, it)
s.items.toInsert = append(s.items.toInsert, it)

case <-s.done:
return false
Expand All @@ -264,7 +270,7 @@ func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *itemList) {
}
}
return true
}, itemsToPlace
}
}

// When gives us the next time the scheduler will run a task.
Expand Down

0 comments on commit 5482cc3

Please sign in to comment.