Skip to content

Commit

Permalink
Merge #73288
Browse files Browse the repository at this point in the history
73288: kv: apply limited timeout to snapshots waiting in reservation queue r=tbg,erikgrinaker a=nvanbenschoten

Alternative to #46655.

This commit introduces a new cluster setting called `kv.snapshot_receiver.queue_timeout_fraction` which dictates the fraction of a snapshot's total timeout that it is allowed to spend queued on the receiver waiting for a reservation. Enforcement of this snapshotApplySem-scoped timeout is intended to prevent starvation of snapshots in cases where a queue of snapshots waiting for reservations builds and no single snapshot acquires the semaphore with sufficient time to complete, but each holds the semaphore long enough to ensure that later snapshots in the queue encounter this same situation. This is a case of FIFO queuing + timeouts leading to starvation. By rejecting snapshot attempts earlier, we ensure that those that do acquire the semaphore have sufficient time to complete.

The commit adds a new test called `TestReserveSnapshotQueueTimeoutAvoidsStarvation` which reproduces this starvation without the fix. With the fix, the test passes and goodput never collapses to 0.

This is an alternative to strict LIFO queueing (#46655) and an alternative to Adaptive LIFO queueing (https://queue.acm.org/detail.cfm?id=2839461). The former avoids starvation but at the expense of fairness even under low but steady concurrency. The latter avoids compromising on fairness until it switches from FIFO to LIFO, but is fairly complex. The approach taken in this PR is a compromise that does not trade fairness under low concurrency and is still relatively simple, but does retain some risk of starvation in the case where `totalTimeout - queueTimeout < processingTime`. The default settings ensure that `processingTime` needs to be at least `30s` (assuming `kv.queue.process.guaranteed_time_budget` is used) before this will become a problem in practice.

Release notes (bug fix): Raft snapshots no longer risk starvation under very high concurrency. Before this fix, it was possible that a thundering herd of Raft snapshots could be starved and prevented from succeeding due to timeouts, which were accompanied by errors like `error rate limiting bulk io write: context deadline exceeded`.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Dec 23, 2021
2 parents 964695b + e951206 commit 62c80fa
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 30 deletions.
1 change: 0 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ server.auth_log.sql_sessions.enabled boolean false if set, log SQL session login
server.authentication_cache.enabled boolean true enables a cache used during authentication to avoid lookups to system tables when retrieving per-user authentication-related information
server.clock.forward_jump_check_enabled boolean false if enabled, forward clock jumps > max_offset/2 will cause a panic
server.clock.persist_upper_bound_interval duration 0s the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature.
server.consistency_check.max_rate byte size 8.0 MiB the rate limit (bytes/sec) to use for consistency checks; used in conjunction with server.consistency_check.interval to control the frequency of consistency checks. Note that setting this too high can negatively impact performance.
server.eventlog.enabled boolean true if set, logged notable events are also stored in the table system.eventlog
server.eventlog.ttl duration 2160h0m0s if nonzero, entries in system.eventlog older than this duration are deleted every 10m0s. Should not be lowered below 24 hours.
server.host_based_authentication.configuration string host-based authentication configuration to use during connection authentication
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var consistencyCheckInterval = settings.RegisterDurationSetting(
settings.TenantWritable,
settings.SystemOnly,
"server.consistency_check.interval",
"the time between range consistency checks; set to 0 to disable consistency checking."+
" Note that intervals that are too short can negatively impact performance.",
Expand All @@ -33,14 +33,14 @@ var consistencyCheckInterval = settings.RegisterDurationSetting(
)

var consistencyCheckRate = settings.RegisterByteSizeSetting(
settings.TenantWritable,
settings.SystemOnly,
"server.consistency_check.max_rate",
"the rate limit (bytes/sec) to use for consistency checks; used in "+
"conjunction with server.consistency_check.interval to control the "+
"frequency of consistency checks. Note that setting this too high can "+
"negatively impact performance.",
8<<20, // 8MB
validatePositive,
settings.PositiveInt,
).WithPublic()

// consistencyCheckRateBurstFactor we use this to set the burst parameter on the
Expand Down
147 changes: 130 additions & 17 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,27 @@ func (s *Store) reserveSnapshot(
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
// getting stuck behind large snapshots managed by the replicate queue.
if header.RangeSize != 0 {
queueCtx := ctx
if deadline, ok := queueCtx.Deadline(); ok {
// Enforce a more strict timeout for acquiring the snapshot reservation to
// ensure that if the reservation is acquired, the snapshot has sufficient
// time to complete. See the comment on snapshotReservationQueueTimeoutFraction
// and TestReserveSnapshotQueueTimeout.
timeoutFrac := snapshotReservationQueueTimeoutFraction.Get(&s.ClusterSettings().SV)
timeout := time.Duration(timeoutFrac * float64(timeutil.Until(deadline)))
var cancel func()
queueCtx, cancel = context.WithTimeout(queueCtx, timeout) // nolint:context
defer cancel()
}
select {
case s.snapshotApplySem <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
case <-queueCtx.Done():
if err := ctx.Err(); err != nil {
return nil, errors.Wrap(err, "acquiring snapshot reservation")
}
return nil, errors.Wrapf(queueCtx.Err(),
"giving up during snapshot reservation due to %q",
snapshotReservationQueueTimeoutFraction.Key())
case <-s.stopper.ShouldQuiesce():
return nil, errors.Errorf("stopped")
}
Expand Down Expand Up @@ -685,14 +702,6 @@ type SnapshotStorePool interface {
throttle(reason throttleReason, why string, toStoreID roachpb.StoreID)
}

// validatePositive is a function to validate that a settings value is positive.
func validatePositive(v int64) error {
if v <= 0 {
return errors.Errorf("%d is not positive", v)
}
return nil
}

// rebalanceSnapshotRate is the rate at which snapshots can be sent in the
// context of up-replication or rebalancing (i.e. any snapshot that was not
// requested by raft itself, to which `kv.snapshot_recovery.max_rate` applies).
Expand All @@ -701,10 +710,10 @@ var rebalanceSnapshotRate = settings.RegisterByteSizeSetting(
"kv.snapshot_rebalance.max_rate",
"the rate limit (bytes/sec) to use for rebalance and upreplication snapshots",
32<<20, // 32mb/s
validatePositive,
settings.PositiveInt,
).WithPublic()

// recoverySnapshotRate is the rate at which Raft-initiated spanshots can be
// recoverySnapshotRate is the rate at which Raft-initiated snapshot can be
// sent. Ideally, one would never see a Raft-initiated snapshot; we'd like all
// replicas to start out as learners or via splits, and to never be cut off from
// the log. However, it has proved unfeasible to completely get rid of them.
Expand All @@ -718,30 +727,134 @@ var recoverySnapshotRate = settings.RegisterByteSizeSetting(
"kv.snapshot_recovery.max_rate",
"the rate limit (bytes/sec) to use for recovery snapshots",
32<<20, // 32mb/s
validatePositive,
settings.PositiveInt,
).WithPublic()

// snapshotSenderBatchSize is the size that key-value batches are allowed to
// grow to during Range snapshots before being sent to the receiver. This limit
// places an upper-bound on the memory footprint of the sender of a Range
// snapshot. It is also the granularity of rate limiting.
var snapshotSenderBatchSize = settings.RegisterByteSizeSetting(
settings.TenantWritable,
settings.SystemOnly,
"kv.snapshot_sender.batch_size",
"size of key-value batches sent over the network during snapshots",
256<<10, // 256 KB
validatePositive,
settings.PositiveInt,
)

// snapshotReservationQueueTimeoutFraction is the maximum fraction of a Range
// snapshot's total timeout that it is allowed to spend queued on the receiver
// waiting for a reservation.
//
// Enforcement of this snapshotApplySem-scoped timeout is intended to prevent
// starvation of snapshots in cases where a queue of snapshots waiting for
// reservations builds and no single snapshot acquires the semaphore with
// sufficient time to complete, but each holds the semaphore long enough to
// ensure that later snapshots in the queue encounter this same situation. This
// is a case of FIFO queuing + timeouts leading to starvation. By rejecting
// snapshot attempts earlier, we ensure that those that do acquire the semaphore
// have sufficient time to complete.
//
// Consider the following motivating example:
//
// With a 60s timeout set by the snapshotQueue/replicateQueue for each snapshot,
// 45s needed to actually stream the data, and a willingness to wait for as long
// as it takes to get the reservation (i.e. this fraction = 1.0) there can be
// starvation. Each snapshot spends so much time waiting for the reservation
// that it will itself fail during sending, while the next snapshot wastes
// enough time waiting for us that it will itself fail, ad infinitum:
//
// t | snap1 snap2 snap3 snap4 snap5 ...
// ----+------------------------------------
// 0 | send
// 15 | queue queue
// 30 | queue
// 45 | ok send
// 60 | queue
// 75 | fail fail send
// 90 | fail send
// 105 |
// 120 | fail
// 135 |
//
// If we limit the amount of time we are willing to wait for a reservation to
// something that is small enough to, on success, give us enough time to
// actually stream the data, no starvation can occur. For example, with a 60s
// timeout, 45s needed to stream the data, we can wait at most 15s for a
// reservation and still avoid starvation:
//
// t | snap1 snap2 snap3 snap4 snap5 ...
// ----+------------------------------------
// 0 | send
// 15 | queue queue
// 30 | fail fail send
// 45 |
// 60 | ok queue
// 75 | ok send
// 90 |
// 105 |
// 120 | ok
// 135 |
//
// In practice, the snapshot reservation logic (reserveSnapshot) doesn't know
// how long sending the snapshot will actually take. But it knows the timeout it
// has been given by the snapshotQueue/replicateQueue, which serves as an upper
// bound, under the assumption that snapshots can make progress in the absence
// of starvation.
//
// Without the reservation timeout fraction, if the product of the number of
// concurrent snapshots and the average streaming time exceeded this timeout,
// the starvation scenario could occur, since the average queuing time would
// exceed the timeout. With the reservation limit, progress will be made as long
// as the average streaming time is less than the guaranteed processing time for
// any snapshot that succeeds in acquiring a reservation:
//
// guaranteed_processing_time = (1 - reservation_queue_timeout_fraction) x timeout
//
// The timeout for the snapshot and replicate queues bottoms out at 60s (by
// default, see kv.queue.process.guaranteed_time_budget). Given a default
// reservation queue timeout fraction of 0.4, this translates to a guaranteed
// processing time of 36s for any snapshot attempt that manages to acquire a
// reservation. This means that a 512MiB snapshot will succeed if sent at a rate
// of 14MiB/s or above.
//
// Lower configured snapshot rate limits quickly lead to a much higher timeout
// since we apply a liberal multiplier (permittedRangeScanSlowdown). Concretely,
// we move past the 1-minute timeout once the rate limit is set to anything less
// than 10*range_size/guaranteed_budget(in MiB/s), which comes out to ~85MiB/s
// for a 512MiB range and the default 1m budget. In other words, the queue uses
// sumptuous timeouts, and so we'll also be excessively lenient with how long
// we're willing to wait for a reservation (but not to the point of allowing the
// starvation scenario). As long as the nodes between the cluster can transfer
// at around ~14MiB/s, even a misconfiguration of the rate limit won't cause
// issues and where it does, the setting can be set to 1.0, effectively
// reverting to the old behavior.
var snapshotReservationQueueTimeoutFraction = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.snapshot_receiver.reservation_queue_timeout_fraction",
"the fraction of a snapshot's total timeout that it is allowed to spend "+
"queued on the receiver waiting for a reservation",
0.4,
func(v float64) error {
const min, max = 0.25, 1.0
if v < min {
return errors.Errorf("cannot set to a value less than %f: %f", min, v)
} else if v > max {
return errors.Errorf("cannot set to a value greater than %f: %f", max, v)
}
return nil
},
)

// snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing.
// The default of 2 MiB was chosen to be in line with the behavior in bulk-io.
// See sstWriteSyncRate.
var snapshotSSTWriteSyncRate = settings.RegisterByteSizeSetting(
settings.TenantWritable,
settings.SystemOnly,
"kv.snapshot_sst.sync_size",
"threshold after which snapshot SST writes must fsync",
bulkIOWriteBurst,
validatePositive,
settings.PositiveInt,
)

func snapshotRateLimit(
Expand Down
90 changes: 90 additions & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -61,6 +62,7 @@ import (
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -2957,6 +2959,94 @@ func TestReserveSnapshotFullnessLimit(t *testing.T) {
}
}

// TestSnapshotReservationQueueTimeoutAvoidsStarvation verifies that the
// snapshot reservation queue applies a tighter queueing timeout than overall
// operation timeout to incoming snapshot requests, which enables it to avoid
// starvation even with its FIFO queueing policy under high concurrency.
func TestReserveSnapshotQueueTimeoutAvoidsStarvation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t)
skip.UnderRace(t)
skip.UnderShort(t)

// Run each snapshot with a 100-millisecond timeout, a 50-millisecond queue
// timeout, and a 15-millisecond process time.
const timeout = 100 * time.Millisecond
const maxQueueTimeout = 50 * time.Millisecond
const timeoutFrac = float64(maxQueueTimeout) / float64(timeout)
const processTime = 15 * time.Millisecond
// Run 8 workers that are each trying to perform snapshots for 3 seconds.
const workers = 8
const duration = 3 * time.Second
// We expect that roughly duration / processTime snapshots "succeed". To avoid
// flakiness, we assert half of this.
const expSuccesses = int(duration / processTime)
const assertSuccesses = expSuccesses / 2
// Sanity check config. If workers*processTime < timeout, then the queue time
// will never be large enough to create starvation. If timeout-maxQueueTimeout
// < processTime then most snapshots won't have enough time to complete.
require.Greater(t, workers*processTime, timeout)
require.Greater(t, timeout-maxQueueTimeout, processTime)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tsc := TestStoreConfig(nil)
// Set the concurrency to 1 explicitly, in case the default ever changes.
tsc.concurrentSnapshotApplyLimit = 1
tc := testContext{}
tc.StartWithStoreConfig(t, stopper, tsc)
s := tc.store
snapshotReservationQueueTimeoutFraction.Override(ctx, &s.ClusterSettings().SV, timeoutFrac)

var done int64
var successes int64
var g errgroup.Group
for i := 0; i < workers; i++ {
g.Go(func() error {
for atomic.LoadInt64(&done) == 0 {
if err := func() error {
snapCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cleanup, err := s.reserveSnapshot(snapCtx, &SnapshotRequest_Header{RangeSize: 1})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil
}
return err
}
defer cleanup()
if atomic.LoadInt64(&done) != 0 {
// If the test has ended, don't process.
return nil
}
// Process...
time.Sleep(processTime)
// Check for sufficient processing time. If we hit a timeout, don't
// count the process attempt as a success. We could make this more
// reactive and terminate the sleep as soon as the ctx is canceled,
// but let's assume the worst case.
if err := snapCtx.Err(); err != nil {
t.Logf("hit %v while processing", err)
} else {
atomic.AddInt64(&successes, 1)
}
return nil
}(); err != nil {
return err
}
}
return nil
})
}

time.Sleep(duration)
atomic.StoreInt64(&done, 1)
require.NoError(t, g.Wait())
require.GreaterOrEqual(t, int(atomic.LoadInt64(&successes)), assertSuccesses)
}

func TestSnapshotRateLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 62c80fa

Please sign in to comment.