Skip to content

Commit

Permalink
[prototype] storage: Make rebalance decisions at store-level
Browse files Browse the repository at this point in the history
As outlined in recent comments on cockroachdb#26059, we need to bring back some
form of stats-based rebalancing in order to perform well on TPC-C
without manual partitioning and replica placement.

This commit contains a prototype that demonstrates the effectiveness of
changing our approach to making rebalancing decisions from making them
in the replicate queue, which operates on arbitrarily ordered replicas
of the ranges on a store, to making them at a higher-level. This
prototype makes them at a cluster level by running the logic on only one
node, but my real proposal is to make them at the store level.

This change in abstraction reflects what a human would do if asked to
even out the load on a cluster given perfect information about
everything happening in the cluster:

1. First, determine which stores have the most load on them (or overfull
   -- but for the prototype I only considered the one dimension that
   affects TPC-C the most)
2. Decide whether the most loaded stores are so overloaded that action
   needs to be taken.
3. Examine the hottest replicas on the store (maybe not the absolute
   hottest in practice, since moving that one could disrupt user traffic,
   but in the prototype this seems to work fine) and attempt to move them
   to under-utilized stores.  If this can be done simply by transferring
   leases to under-utilized stores, then do so. If moving leases isn't
   enough, then also rebalance replicas from the hottest store to
   under-utilized stores.
4. Repeat periodically to handle changes in load or cluster membership.

In a real versino of this code, the plan is roughly:
1. Each store will independently run their own control loop like this
   that is only responsible for moving leases/replicas off itself, not off
   other stores. This avoids needing a centralized coordinator, and will
   avoid the need to use the raft debug endpoint as long as we start
   gossiping QPS per store info, since the store already has details about
   the replicas on itself.
2. The existing replicate queue will stop making decisions motivated by
   balance. It will switch to only making decisions based on
   constraints/diversity/lease preferences, which is still needed since
   the new store-level logic will only check for store-level balance,
   not that all replicas' constraints are properly met.
3. The new code will have to avoid violating constraints/diversity/lease
   preferences.
4. The new code should consider range count, disk fullness, and maybe
   writes per second as well.
5. In order to avoid making decisions based on bad data, I'd like to
   extend lease transfers to pass along QPS data to the new leaseholder
   and preemptive snapshots to pass along WPS data to the new replica.
   This may not be strictly necessary, as shown by the success of this
   prototype, but should make for more reliable decision making.

I tested this out on TPC-C 5k on 15 nodes and am able to consistently
get 94% efficiency, which is the max I've seen using a build of the
workload generator that erroneously includes the ramp-up period in its
final stats. The first run with this code only got 85% because it took a
couple minutes to make all the lease transfers it wanted, but then all
subsequent runs got the peak efficiency while making negligibly few
lease transfers.

Note that I didn't even have to implement replica rebalancing to get
these results, which oddly contradicts my previous claims. However, I
believe that's because I did the initial split/scatter using a binary
containing cockroachdb#26438, so the replicas were already better scattered than by
default. I ran TPC-C on that build without these changes a couple times,
though, and didn't get better than 65% efficiency, so the scatter wasn't
the cause of the good results here.

Touches cockroachdb#26059, cockroachdb#17979

Release note: None

[prototype] storage: Extend new allocator to also move range replicas

With this update, TPC-C 10k on 30 went from overloaded to running at
peak efficiency over the course of about 4 hours (the manual
partitioning approach takes many hours to move all the replicas as well,
for a point of comparison). This is without having to run the replica
scatter from cockroachdb#26438.

Doing a 5 minute run to get a result that doesn't include all the
rebalancing time shows:

_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  290.9s   124799.1  97.0%    548.6    486.5    872.4   1140.9   2281.7  10200.5

I think it may have a small bug in it still, since at one point early on
one of the replicas from the warehouse table on the node doing the
relocating thought that it had 16-17k QPS, which wasn't true by any
other metric in the system. Restarting the node fixed it though.
I'm not too concerned about the bug, since I assume I just made a code
mistake, not that anything about the approach fundamentally leads to a
random SQL table replica gets 10s of thousands of QPS.

Range 1 is also back to getting a ton of QPS (~3k) even though I raised
the range cache size from 1M to 50M. Looking at slow query traces shows
a lot of range lookups, way more than I'd expect given that ranges
weren't moving around at the time of the traces.

Release note: None

Release note: None
  • Loading branch information
a-robinson committed Aug 3, 2018
1 parent 624aacd commit 71999d1
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<tr><td><code>server.remote_debugging.mode</code></td><td>string</td><td><code>local</code></td><td>set to enable remote debugging, localhost-only or disable (any, local, off)</td></tr>
<tr><td><code>server.shutdown.drain_wait</code></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with the rest of the shutdown process</td></tr>
<tr><td><code>server.shutdown.query_wait</code></td><td>duration</td><td><code>10s</code></td><td>the server will wait for at least this amount of time for active queries to finish</td></tr>
<tr><td><code>server.test_qps_threshold</code></td><td>float</td><td><code>2.5E-01</code></td><td>the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in</td></tr>
<tr><td><code>server.time_until_store_dead</code></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>1</code></td><td>default distributed SQL execution mode [off = 0, auto = 1, on = 2]</td></tr>
Expand Down
332 changes: 332 additions & 0 deletions pkg/server/prototype_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package server

import (
"container/heap"
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

var testQPSThreshold = settings.RegisterNonNegativeFloatSetting(
"server.test_qps_threshold",
"the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in",
0.15,
)

func (s *Server) RunStoreLevelAllocator(ctx context.Context) {
if s.NodeID() != 1 {
return
}

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
// Wait out the first tick before doing anything since the store is still
// starting up and we might as well wait for some qps/wps stats to
// accumulate.
select {
case <-s.stopper.ShouldQuiesce():
return
case <-ticker.C:
}

log.Infof(ctx, "starting prototype allocator loop")

resp, err := s.status.RaftDebug(ctx, &serverpb.RaftDebugRequest{})
if err != nil {
log.Errorf(ctx, "failed to retrieve raft debug info: %s", err)
continue
}

qpsPerStore, hottestRangesByStore := processResponse(resp)
if len(qpsPerStore) == 0 {
log.Infof(ctx, "received no stores to process: %+v", resp)
continue
}

log.Infof(ctx, "qpsPerStore: %v", qpsPerStore)

var avgQPS float64
for _, qps := range qpsPerStore {
avgQPS += qps
}
avgQPS /= float64(len(qpsPerStore))
upperBound := math.Max(avgQPS*(1+testQPSThreshold.Get(&s.st.SV)), avgQPS+100)
log.Infof(ctx, "avgQPS: %f, upperBound: %f", avgQPS, upperBound)

consideringLeases := true

// TODO: Also consider trying to move work to under-utilized stores even
// if there aren't any outliers at the top end.
topLevelLoop:
for iter := 0; iter < 64; iter++ {
// Try to lessen the load on the hottest store.
hottestStore, hottestQPS := findHottestStore(qpsPerStore)
log.Infof(ctx, "hottestStore: s%d, hottestQPS: %f", hottestStore, hottestQPS)
if hottestQPS <= upperBound {
break topLevelLoop
}

hottestRanges := hottestRangesByStore[hottestStore]
if len(hottestRanges) == 0 {
log.Warningf(ctx, "no more hot ranges for s%d to move", hottestStore)
}

if consideringLeases {
var rangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs)

// First check if there are any leases we can reasonably move.
for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)
for j := range r.Nodes {
storeID := r.Nodes[j].Range.SourceStoreID
// Transfer the lease if we can move it to a store that will still be
// under the average per-store QPS.
if qpsPerStore[storeID]+qps < avgQPS {
// Attempt to transfer the lease, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID])
if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil {
log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err)
continue topLevelLoop
}
qpsPerStore[storeID] += qps
qpsPerStore[hottestStore] -= qps
continue topLevelLoop
}
}
}
}

// If that didn't work out, then resort to rebalancing replicas.
if consideringLeases {
log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances")
consideringLeases = false
}

hottestRanges = hottestRangesByStore[hottestStore]
var remainingRangeIDs []roachpb.RangeID
for i := range hottestRanges {
remainingRangeIDs = append(remainingRangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest remaining rangeIDs: %v", remainingRangeIDs)

for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)

// Pick out the stores that we want the range on, keeping existing
// replicas around if they aren't on overfull stores.
const desiredReplicas = 3
targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas)
for j := range r.Nodes {
storeID := r.Nodes[j].Range.SourceStoreID
if qpsPerStore[storeID] < upperBound {
targets = append(targets, roachpb.ReplicationTarget{
NodeID: r.Nodes[j].Range.SourceNodeID,
StoreID: storeID,
})
}
}

// Then pick out which new stores to add the remaining replicas to.
for storeID, candidateQPS := range qpsPerStore {
if len(targets) >= desiredReplicas {
break
}
if candidateQPS+qps < avgQPS && !existingTarget(targets, storeID) {
desc, found := s.storePool.GetStoreDescriptor(storeID)
if !found {
log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID)
}
targets = append(targets, roachpb.ReplicationTarget{
NodeID: desc.Node.NodeID,
StoreID: storeID,
})
}
}

// If we still don't have enough targets, let them go up to the upper bound.
for storeID, candidateQPS := range qpsPerStore {
if len(targets) >= desiredReplicas {
break
}
if candidateQPS+qps < upperBound && !existingTarget(targets, storeID) {
desc, found := s.storePool.GetStoreDescriptor(storeID)
if !found {
log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID)
}
targets = append(targets, roachpb.ReplicationTarget{
NodeID: desc.Node.NodeID,
StoreID: storeID,
})
}
}

if len(targets) < desiredReplicas {
continue
}

// Pick the replica with the least QPS to be leaseholder;
// RelocateRange transfers the lease to the first provided
// target.
newLeaseIdx := 0
for j := 1; j < len(targets); j++ {
if qpsPerStore[targets[j].StoreID] < qpsPerStore[targets[newLeaseIdx].StoreID] {
newLeaseIdx = j
}
}
targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0]

// Attempt to relocate the range, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "relocating range r%d from %v to %v; new leaseholder qps = %f", r.RangeID, r.Nodes[0].Range.State.ReplicaState.Desc, targets, qpsPerStore[targets[0].StoreID])
if err := storage.RelocateRange(
ctx, s.db, *r.Nodes[0].Range.State.ReplicaState.Desc, targets,
); err != nil {
log.Errorf(ctx, "error relocating range r%d to %v: %s", r.RangeID, targets, err)
continue topLevelLoop
}

qpsPerStore[hottestStore] -= qps
qpsPerStore[targets[0].StoreID] += qps
continue topLevelLoop
}
}
}
}

func findHottestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID, float64) {
var storeID roachpb.StoreID
var qps float64
for s, q := range qpsPerStore {
if q > qps {
storeID = s
qps = q
}
}
return storeID, qps
}

func findColdestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID, float64) {
var storeID roachpb.StoreID
qps := math.MaxFloat64
for s, q := range qpsPerStore {
if q < qps {
storeID = s
qps = q
}
}
return storeID, qps
}

func existingTarget(targets []roachpb.ReplicationTarget, newStore roachpb.StoreID) bool {
for _, target := range targets {
if newStore == target.StoreID {
return true
}
}
return false
}

func processResponse(
resp *serverpb.RaftDebugResponse,
) (map[roachpb.StoreID]float64, map[roachpb.StoreID][]*serverpb.RaftRangeStatus) {
qpsPerStore := make(map[roachpb.StoreID]float64)
hottestRangeQueues := make(map[roachpb.StoreID]*PriorityQueue)
for _, r := range resp.Ranges {
r := r
lease, qps := leaseAndQPS(&r)
qpsPerStore[lease] += qps
pq := hottestRangeQueues[lease]
if pq == nil {
pq = &PriorityQueue{}
heap.Init(pq)
hottestRangeQueues[lease] = pq
}
heap.Push(pq, &r)
if pq.Len() > 64 {
heap.Pop(pq)
}
}

hottestRanges := make(map[roachpb.StoreID][]*serverpb.RaftRangeStatus)
for storeID, pq := range hottestRangeQueues {
length := pq.Len()
hottestRanges[storeID] = make([]*serverpb.RaftRangeStatus, length)
rangeQPS := make([]float64, length)
for i := 1; i <= length; i++ {
hottestRanges[storeID][length-i] = heap.Pop(pq).(*serverpb.RaftRangeStatus)
rangeQPS[length-i] = qps(hottestRanges[storeID][length-i])
}
log.Infof(context.TODO(), "hottest ranges for s%d: %v", storeID, rangeQPS)
}

return qpsPerStore, hottestRanges
}

func qps(r *serverpb.RaftRangeStatus) float64 {
_, qps := leaseAndQPS(r)
return qps
}

func leaseAndQPS(r *serverpb.RaftRangeStatus) (roachpb.StoreID, float64) {
for i := range r.Nodes {
if r.Nodes[i].Range.State.ReplicaState.Lease.Replica.StoreID == r.Nodes[i].Range.SourceStoreID {
return r.Nodes[i].Range.SourceStoreID, r.Nodes[i].Range.Stats.QueriesPerSecond
}
}
return 0, 0
}

type PriorityQueue []*serverpb.RaftRangeStatus

func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
return qps(pq[i]) < qps(pq[j])
}

func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}

func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*serverpb.RaftRangeStatus)
*pq = append(*pq, item)
}

func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,9 @@ If problems persist, please see ` + base.DocsURL("cluster-setup-troubleshooting.
}
}

// TODO: REMOVE
//s.stopper.RunWorker(ctx, s.RunStoreLevelAllocator)

log.Event(ctx, "server ready")

return nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ func (sp *StorePool) getStoreDetailLocked(storeID roachpb.StoreID) *storeDetail
return detail
}

// GetStoreDescriptor returns the latest store descriptor for the given
// storeID.
func (sp *StorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {
return sp.getStoreDescriptor(storeID)
}

// getStoreDescriptor returns the latest store descriptor for the given
// storeID.
func (sp *StorePool) getStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/mon/bytes_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (mm *BytesMonitor) reserveBytes(ctx context.Context, x int64) error {
// limit the amount of log messages when a size blowup is caused by
// many small allocations.
if bits.Len64(uint64(mm.mu.curAllocated)) != bits.Len64(uint64(mm.mu.curAllocated-x)) {
log.Infof(ctx, "%s: bytes usage increases to %s (+%d)",
log.VEventf(ctx, 3, "%s: bytes usage increases to %s (+%d)",
mm.name,
humanizeutil.IBytes(mm.mu.curAllocated), x)
}
Expand Down

0 comments on commit 71999d1

Please sign in to comment.