Skip to content

Commit

Permalink
kvserver: replace read amp with io threshold
Browse files Browse the repository at this point in the history
L0-sublevels used to be checked as a gauge of store health in order to
exclude unhealthy stores from being allocation targets. In #83720,
`IOThreshold` was included into gossip. `IOThreshold` includes
the L0-sublevels, L0-filecount and may be used for additional
measures of store health, making it more informative than looking at
`L0-sublevels` alone.

This commit stops gossiping the L0-sublevels and replaces uses of
L0-sublevels in the allocator and storepool with `IOThreshold`.

The prior cluster settings which controlled how store health was
considered in allocation are now deprecated:

`kv.allocator.L0_sublevels_threshold`
`kv.allocator.L0_sublevels_threshold_enforce`

The new cluster settings which perform an identical function:

`kv.allocator.io_overload_threshold`
Which has the same concept as `L0-sublevels_threshold`, however is
adjusted for `IOThreshold` where a value above 1 indicates overload. The
default cluster setting value is set to `0.8`, which is intentional to
prevent allocation before overload occurs.

`kv.allocator.io_overload_threshold_enforce`
Which is identical to the `L0_sublevels_threshold_enforce`.

Resolves: #85084

Release note (ops change): Deprecate cluster settings
`kv.allocator.l0_sublevels_threshold` and
`kv.allocator.L0_sublevels_threshold_enforce` and introduce
`kv.allocator.io_overload_threshold` and
`kv.allocator.io_overload_threshold_enforce`. The enforcement cluster
setting is unchanged, however renamed. The threshold cluster setting is
adapted to `IOThreshold`, where a value greater or equal to 1 indicates
IO overload. The default is set to 0.8 to prevent allocation prior to
overload occurring.
  • Loading branch information
kvoli committed Feb 14, 2023
1 parent 3b9d8cb commit fa3ad10
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 288 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/slidingwindow",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2036,16 +2036,16 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences(
}

// StoreHealthOptions returns the store health options, currently only
// considering the threshold for L0 sub-levels. This threshold is not
// considering the threshold for io overload. This threshold is not
// considered in allocation or rebalancing decisions (excluding candidate
// stores as targets) when enforcementLevel is set to storeHealthNoAction or
// storeHealthLogOnly. By default storeHealthBlockRebalanceTo is the action taken. When
// there is a mixed version cluster, storeHealthNoAction is set instead.
func (a *Allocator) StoreHealthOptions(_ context.Context) StoreHealthOptions {
enforcementLevel := StoreHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.st.SV))
enforcementLevel := StoreHealthEnforcement(IOOverloadThresholdEnforce.Get(&a.st.SV))
return StoreHealthOptions{
EnforcementLevel: enforcementLevel,
L0SublevelThreshold: l0SublevelsThreshold.Get(&a.st.SV),
EnforcementLevel: enforcementLevel,
IOThreshold: IOOverloadThreshold.Get(&a.st.SV),
}
}

Expand Down
225 changes: 110 additions & 115 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) {
defer log.Scope(t).Close(t)

testCases := []struct {
valid, invalid, full, readAmpHigh int
valid, invalid, full, ioOverloaded int
}{
{0, 0, 0, 0},
{1, 0, 0, 0},
Expand Down Expand Up @@ -80,16 +80,16 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) {
for i := 0; i < tc.full; i++ {
cl = append(cl, candidate{fullDisk: true})
}
for i := 0; i < tc.readAmpHigh; i++ {
cl = append(cl, candidate{highReadAmp: true})
for i := 0; i < tc.ioOverloaded; i++ {
cl = append(cl, candidate{ioOverloaded: true})
}
sort.Sort(sort.Reverse(byScore(cl)))

valid := cl.onlyValidAndHealthyDisk()
if a, e := len(valid), tc.valid; a != e {
t.Errorf("expected %d valid, actual %d", e, a)
}
if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.readAmpHigh; a != e {
if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.ioOverloaded; a != e {
t.Errorf("expected %d invalid, actual %d", e, a)
}
})
Expand Down
137 changes: 66 additions & 71 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/shuffle",
"//pkg/util/slidingwindow",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand All @@ -45,6 +46,7 @@ go_test(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/gossiputil",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
79 changes: 63 additions & 16 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/slidingwindow"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)
Expand All @@ -41,6 +42,10 @@ const (
// TestTimeUntilStoreDeadOff is the test value for TimeUntilStoreDead that
// prevents the store pool from marking stores as dead.
TestTimeUntilStoreDeadOff = 24 * time.Hour

// IOOverloadTrackedRetention is the period over which to accumulate
// statistics on IO overload within a store.
IOOverloadTrackedRetention = time.Minute * 10
)

// FailedReservationsTimeout specifies a duration during which the local
Expand Down Expand Up @@ -198,6 +203,14 @@ type StoreDetail struct {
// LastAvailable is set when it's detected that a store was available,
// i.e. we got a liveness heartbeat.
LastAvailable time.Time
// storeHealthTracker tracks the store health over the last
// IOOverloadTrackedRetention period, which is 10 minutes. This serves to
// exclude stores based on historical information and not just
// point-in-time information.
storeHealthTracker struct {
NumL0FilesTracker *slidingwindow.Swag
NumL0SublevelsTracker *slidingwindow.Swag
}
}

// isThrottled returns whether the store is currently throttled.
Expand Down Expand Up @@ -558,26 +571,50 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string {
// storeGossipUpdate is the Gossip callback used to keep the StorePool up to date.
func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) {
var storeDesc roachpb.StoreDescriptor
// We keep copies of the capacity and storeID to pass into the
// capacityChanged callback.
var oldCapacity, curCapacity roachpb.StoreCapacity
var storeID roachpb.StoreID

if err := content.GetProto(&storeDesc); err != nil {
ctx := sp.AnnotateCtx(context.TODO())
log.Errorf(ctx, "%v", err)
return
}
storeID = storeDesc.StoreID
curCapacity = storeDesc.Capacity

sp.storeDescriptorUpdate(storeDesc)
}

// storeDescriptorUpdate takes a store descriptor and updates the corresponding
// details for the store in the storepool.
func (sp *StorePool) storeDescriptorUpdate(storeDesc roachpb.StoreDescriptor) {
// We keep copies of the capacity and storeID to pass into the
// capacityChanged callback.
var oldCapacity roachpb.StoreCapacity
storeID := storeDesc.StoreID
curCapacity := storeDesc.Capacity

now := sp.clock.PhysicalTime()

sp.DetailsMu.Lock()
detail := sp.GetStoreDetailLocked(storeID)

// We update the store descriptor to reflect the maximum IO Overload values
// that have been seen in the past tracking interval, updating the trackers
// with the new values prior to querying. This means that the most recent
// value for IOThreshold.*Threshold is always used, together with the
// tracked recent maximum IOThreshold.L0NumFiles/L0NumSubLevels to
// calculate the score.
detail.storeHealthTracker.NumL0FilesTracker.Record(
now, float64(curCapacity.IOThreshold.L0NumFiles))
detail.storeHealthTracker.NumL0SublevelsTracker.Record(
now, float64(curCapacity.IOThreshold.L0NumSubLevels))
maxL0NumFiles, _ := detail.storeHealthTracker.NumL0FilesTracker.Query(now)
maxL0NumSublevels, _ := detail.storeHealthTracker.NumL0SublevelsTracker.Query(now)
storeDesc.Capacity.IOThreshold.L0NumFiles = int64(maxL0NumFiles)
storeDesc.Capacity.IOThreshold.L0NumSubLevels = int64(maxL0NumSublevels)

if detail.Desc != nil {
oldCapacity = detail.Desc.Capacity
}
detail.Desc = &storeDesc
detail.LastUpdatedTime = sp.clock.PhysicalTime()
detail.LastUpdatedTime = now
sp.DetailsMu.Unlock()

sp.localitiesMu.Lock()
Expand Down Expand Up @@ -751,8 +788,16 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer(

// newStoreDetail makes a new StoreDetail struct. It sets index to be -1 to
// ensure that it will be processed by a queue immediately.
func newStoreDetail() *StoreDetail {
return &StoreDetail{}
func newStoreDetail(now time.Time) *StoreDetail {
sd := &StoreDetail{}
// Track the store health, specifically IO overload for the
// IOOverloadTrackedRetention period, using a sliding window with 5
// buckets.
sd.storeHealthTracker.NumL0FilesTracker = slidingwindow.NewMaxSwag(
now, IOOverloadTrackedRetention/5, 5)
sd.storeHealthTracker.NumL0SublevelsTracker = slidingwindow.NewMaxSwag(
now, IOOverloadTrackedRetention/5, 5)
return sd
}

// GetStores returns information on all the stores with descriptor in the pool.
Expand Down Expand Up @@ -781,7 +826,7 @@ func (sp *StorePool) GetStoreDetailLocked(storeID roachpb.StoreID) *StoreDetail
// network). The first time this occurs, presume the store is
// alive, but start the clock so it will become dead if enough
// time passes without updates from gossip.
detail = newStoreDetail()
detail = newStoreDetail(sp.clock.Now().GoTime())
detail.LastUpdatedTime = sp.startTime
sp.DetailsMu.StoreDetails[storeID] = detail
}
Expand Down Expand Up @@ -1063,9 +1108,9 @@ type StoreList struct {
// eligible to be rebalance targets.
candidateWritesPerSecond Stat

// candidateWritesPerSecond tracks L0 sub-level stats for Stores that are
// eligible to be rebalance targets.
CandidateL0Sublevels Stat
// CandidateIOOverload tracks the IO overload stats for Stores that are
// eligible to be rebalance candidates.
CandidateIOOverload Stat
}

// MakeStoreList constructs a new store list based on the passed in descriptors.
Expand All @@ -1080,8 +1125,9 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList {
sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes))
sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond)
sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond)
sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels))
sl.CandidateCPU.update(desc.Capacity.CPUPerSecond)
score, _ := desc.Capacity.IOThreshold.Score()
sl.CandidateIOOverload.update(score)
}
return sl
}
Expand All @@ -1102,12 +1148,13 @@ func (sl StoreList) String() string {
fmt.Fprintf(&buf, " <no candidates>")
}
for _, desc := range sl.Stores {
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n",
ioScore, _ := desc.Capacity.IOThreshold.Score()
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s io-overload=%.2f\n",
desc.StoreID, desc.Capacity.RangeCount,
desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes),
desc.Capacity.QueriesPerSecond,
humanizeutil.Duration(time.Duration(int64(desc.Capacity.CPUPerSecond))),
desc.Capacity.L0Sublevels,
ioScore,
)
}
return buf.String()
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1029,3 +1030,84 @@ func TestNodeLivenessLivenessStatus(t *testing.T) {
})
}
}

func TestStorePoolStoreHealthTracker(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, _, clock, sp, _ := CreateTestStorePool(ctx, st,
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 1 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
defer stopper.Stop(ctx)
desc := roachpb.StoreDescriptor{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{
IOThreshold: admissionpb.IOThreshold{
L0NumSubLevels: 5,
L0NumSubLevelsThreshold: 20,
L0NumFiles: 5,
L0NumFilesThreshold: 1000,
},
},
}

checkIOOverloadScore := func() float64 {
detail, ok := sp.GetStoreDescriptor(1)
require.True(t, ok)
score, _ := detail.Capacity.IOThreshold.Score()
return score
}

// Intiailly sublevels is 5/20 = 0.25, expect that score.
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.25, checkIOOverloadScore())
// Now increase the numfiles to be greater (500/1000) so the score should
// update to 0.5.
desc.Capacity.IOThreshold.L0NumFiles = 500
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.5, checkIOOverloadScore())
// Despite the numfiles now decreasing again to 0, the max tracked should
// be still 500 and therefore the score unchanged.
desc.Capacity.IOThreshold.L0NumFiles = 0
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.5, checkIOOverloadScore())
// Update the threshold, which will be used with the max tracked score
// (500/500).
desc.Capacity.IOThreshold.L0NumFilesThreshold = 500
sp.storeDescriptorUpdate(desc)
require.Equal(t, 1.0, checkIOOverloadScore())
// Update the sublevels to be greater than the num files score.
desc.Capacity.IOThreshold.L0NumSubLevels = 40
sp.storeDescriptorUpdate(desc)
require.Equal(t, 2.0, checkIOOverloadScore())
// Decreasing the sublevels doesn't affect the max tracked since they are
// recorded at the same time.
desc.Capacity.IOThreshold.L0NumSubLevels = 20
sp.storeDescriptorUpdate(desc)
require.Equal(t, 2.0, checkIOOverloadScore())
// Advance past the retention period, so that previous values are
// rotated out.
clock.Advance(IOOverloadTrackedRetention)
desc.Capacity.IOThreshold.L0NumSubLevels = 10
desc.Capacity.IOThreshold.L0NumSubLevelsThreshold = 20
desc.Capacity.IOThreshold.L0NumFiles = 0
desc.Capacity.IOThreshold.L0NumFilesThreshold = 1000
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.5, checkIOOverloadScore())
// Rotate 1/2 the retention period and update sublevels with a lower value,
// this shouldn't be included in the score as it is lower than the maximum
// seen in the retention period.
clock.Advance(IOOverloadTrackedRetention / 2)
desc.Capacity.IOThreshold.L0NumSubLevels = 5
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.5, checkIOOverloadScore())
// Rotate 1/2 the retention period again, this time the sublevels added
// above should now be the maximum (5/20).
clock.Advance(IOOverloadTrackedRetention / 2)
desc.Capacity.IOThreshold.L0NumSubLevels = 0
sp.storeDescriptorUpdate(desc)
require.Equal(t, 0.25, checkIOOverloadScore())
}
Loading

0 comments on commit fa3ad10

Please sign in to comment.