Skip to content

Commit

Permalink
colflow: prevent deadlocks when many queries spill to disk at same time
Browse files Browse the repository at this point in the history
This commit fixes a long-standing issue which could cause
memory-intensive queries to deadlock on acquiring the file descriptors
quota when vectorized execution spills to disk. This bug has been
present since the introduction of disk-spilling (over two and a half
years ago, introduced in cockroachdb#45318 and partially mitigated in cockroachdb#45892), but
we haven't seen this in any user reports, only in `tpch_concurrency`
roachtest runs, so the severity seems pretty minor.

Consider the following query plan:
```
   Node 1                   Node 2

TableReader              TableReader
    |                         |
HashRouter                HashRouter
    |     \  ___________ /    |
    |      \/__________       |
    |      /           \      |
HashAggregator         HashAggregator
```
and let's imagine that each hash aggregator has to spill to disk. This
would require acquiring the file descriptors quota. Now, imagine that
because of that hash aggregators' spilling, each of the hash routers has
slow outputs causing them to spill too. As a result, this query plan can
require `A + 2 * R` number of FDs of a single node to succeed where `A`
is the quota for a single hash aggregator (equal to 16 - with the
default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which
is 256) and `R` is the quota for a single router output (2). This means
that we can estimate that 20 FDs from each node are needed for the query
to finish execution with 16 FDs being acquired first.

Now imagine that this query is run with concurrency of 16. We can end up
in such a situation that all hash aggregators have spilled, fully
exhausting the global node limit on each node, so whenever the hash
router outputs need to spill, they block forever since no FDs will ever
be released, until a query is canceled or a node is shutdown. In other
words, we have a deadlock.

This commit fixes this situation by introducing a retry mechanism to
exponentially backoff when trying to acquire the FD quota, until a time
out. The randomizations provided by the `retry` package should be
sufficient so that some of the queries succeed while others result in
an error.

Unfortunately, I don't see a way to prevent this deadlock from occurring
in the first place without possible increase in latency in some case.
The difficult thing is that we currently acquire FDs only once we need
them, meaning once a particular component spills to disk. We could
acquire the maximum number of FDs that a query might need up-front,
before the query execution starts, but that could lead to starvation of
the queries that ultimately won't spill to disk. This seems like a much
worse impact than receiving timeout errors on some analytical queries
when run with high concurrency. We're not an OLAP database, so this
behavior seems ok.

Release note (bug fix): Previously, CockroachDB could deadlock when
evaluating analytical queries f multiple queries had to spill to disk
at the same time. This is now fixed by making some of the queries error
out instead.
  • Loading branch information
yuzefovich committed Jul 19, 2022
1 parent 1d90e9b commit aa14edd
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 24 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func (r opResult) createDiskBackedSort(
args.DiskQueueCfg,
args.FDSemaphore,
diskAccount,
flowCtx.TestingKnobs().VecFDsToAcquire,
)
r.ToClose = append(r.ToClose, es.(colexecop.Closer))
return es
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func NewExternalSorter(
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
diskAcc *mon.BoundAccount,
testingVecFDsToAcquire int,
) colexecop.Operator {
if diskQueueCfg.BufferSizeBytes > 0 && maxNumberPartitions == 0 {
// With the default limit of 256 file descriptors, this results in 16
Expand All @@ -242,6 +243,9 @@ func NewExternalSorter(
if maxNumberPartitions < colexecop.ExternalSorterMinPartitions {
maxNumberPartitions = colexecop.ExternalSorterMinPartitions
}
if testingVecFDsToAcquire > 0 {
maxNumberPartitions = testingVecFDsToAcquire
}
if memoryLimit == 1 {
// If memory limit is 1, we're likely in a "force disk spill"
// scenario, but we don't want to artificially limit batches when we
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ func calculateMaxNumberActivePartitions(
if maxNumberActivePartitions < numRequiredActivePartitions {
maxNumberActivePartitions = numRequiredActivePartitions
}
if toAcquire := flowCtx.TestingKnobs().VecFDsToAcquire; toAcquire > 0 {
maxNumberActivePartitions = toAcquire
}
return maxNumberActivePartitions
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/flowinfra",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sessiondatapb",
Expand All @@ -44,6 +46,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/optional",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
Expand All @@ -65,6 +68,7 @@ go_test(
"main_test.go",
"routers_test.go",
"stats_test.go",
"vectorized_flow_deadlock_test.go",
"vectorized_flow_planning_test.go",
"vectorized_flow_shutdown_test.go",
"vectorized_flow_space_test.go",
Expand Down Expand Up @@ -118,6 +122,8 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/cancelchecker",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
Expand Down
99 changes: 75 additions & 24 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -44,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/optional"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -52,39 +55,84 @@ import (
"github.com/marusama/semaphore"
)

// countingSemaphore is a semaphore that keeps track of the semaphore count from
// its perspective.
// Note that it effectively implements the execinfra.Releasable interface but
// due to the method name conflict doesn't.
type countingSemaphore struct {
// fdCountingSemaphore is a semaphore that keeps track of the number of file
// descriptors currently used by the vectorized engine.
//
// Note that it effectively implements the execreleasable.Releasable interface
// but due to the method name conflict doesn't.
type fdCountingSemaphore struct {
semaphore.Semaphore
globalCount *metric.Gauge
count int64
globalCount *metric.Gauge
count int64
testingAcquireMaxRetries int
}

var countingSemaphorePool = sync.Pool{
var fdCountingSemaphorePool = sync.Pool{
New: func() interface{} {
return &countingSemaphore{}
return &fdCountingSemaphore{}
},
}

func newCountingSemaphore(sem semaphore.Semaphore, globalCount *metric.Gauge) *countingSemaphore {
s := countingSemaphorePool.Get().(*countingSemaphore)
func newFDCountingSemaphore(
sem semaphore.Semaphore, globalCount *metric.Gauge, testingAcquireMaxRetries int,
) *fdCountingSemaphore {
s := fdCountingSemaphorePool.Get().(*fdCountingSemaphore)
s.Semaphore = sem
s.globalCount = globalCount
s.testingAcquireMaxRetries = testingAcquireMaxRetries
return s
}

func (s *countingSemaphore) Acquire(ctx context.Context, n int) error {
if err := s.Semaphore.Acquire(ctx, n); err != nil {
return err
var errAcquireTimeout = pgerror.New(
pgcode.ConfigurationLimitExceeded,
"acquiring of file descriptors timed out, consider increasing "+
"COCKROACH_VEC_MAX_OPEN_FDS environment variable",
)

func (s *fdCountingSemaphore) Acquire(ctx context.Context, n int) error {
if s.TryAcquire(n) {
return nil
}
atomic.AddInt64(&s.count, int64(n))
s.globalCount.Inc(int64(n))
return nil
// Currently there is not enough capacity in the semaphore to acquire the
// desired number, so we set up a retry loop that exponentially backs off,
// until either the semaphore opens up or we time out (most likely due to a
// deadlock).
//
// The latter situation is possible when multiple queries already hold some
// of the quota and each of them needs more to proceed resulting in a
// deadlock. We get out of such a deadlock by randomly erroring out one of
// the queries (which would release some quota back to the semaphore) making
// it possible for other queries to proceed.
//
// Note that we've already tried to acquire the quota above (which failed),
// so the initial backoff time of 100ms seems ok (we are spilling to disk
// after all, so the query is likely to experience significant latency). The
// current choice of options is such that we'll spend on the order of 25s
// in the retry loop before timing out.
maxRetries := s.testingAcquireMaxRetries
if maxRetries <= 0 {
// Make sure that the retry loop is finite.
maxRetries = 8
}
opts := retry.Options{
InitialBackoff: 100 * time.Millisecond,
Multiplier: 2.0,
RandomizationFactor: 0.25,
MaxRetries: maxRetries,
}
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if s.TryAcquire(n) {
return nil
}
}
if ctx.Err() != nil {
return ctx.Err()
}
log.Warning(ctx, "acquiring of file descriptors for disk-spilling timed out")
return errAcquireTimeout
}

func (s *countingSemaphore) TryAcquire(n int) bool {
func (s *fdCountingSemaphore) TryAcquire(n int) bool {
success := s.Semaphore.TryAcquire(n)
if !success {
return false
Expand All @@ -94,7 +142,7 @@ func (s *countingSemaphore) TryAcquire(n int) bool {
return success
}

func (s *countingSemaphore) Release(n int) int {
func (s *fdCountingSemaphore) Release(n int) int {
atomic.AddInt64(&s.count, int64(-n))
s.globalCount.Dec(int64(n))
return s.Semaphore.Release(n)
Expand All @@ -103,12 +151,12 @@ func (s *countingSemaphore) Release(n int) int {
// ReleaseToPool should be named Release and should implement the
// execinfra.Releasable interface, but that would lead to a conflict with
// semaphore.Semaphore.Release method.
func (s *countingSemaphore) ReleaseToPool() {
func (s *fdCountingSemaphore) ReleaseToPool() {
if unreleased := atomic.LoadInt64(&s.count); unreleased != 0 {
colexecerror.InternalError(errors.Newf("unexpectedly %d count on the semaphore when releasing it to the pool", unreleased))
}
*s = countingSemaphore{}
countingSemaphorePool.Put(s)
*s = fdCountingSemaphore{}
fdCountingSemaphorePool.Put(s)
}

type vectorizedFlow struct {
Expand All @@ -127,7 +175,7 @@ type vectorizedFlow struct {
// of the number of resources held in a semaphore.Semaphore requested from the
// context of this flow so that these can be released unconditionally upon
// Cleanup.
countingSemaphore *countingSemaphore
countingSemaphore *fdCountingSemaphore

tempStorage struct {
syncutil.Mutex
Expand Down Expand Up @@ -195,8 +243,11 @@ func (f *vectorizedFlow) Setup(
if err := diskQueueCfg.EnsureDefaults(); err != nil {
return ctx, nil, err
}
f.countingSemaphore = newCountingSemaphore(f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs)
flowCtx := f.GetFlowCtx()
f.countingSemaphore = newFDCountingSemaphore(
f.Cfg.VecFDSemaphore, f.Cfg.Metrics.VecOpenFDs,
flowCtx.TestingKnobs().VecFDsAcquireMaxRetriesCount,
)
f.creator = newVectorizedFlowCreator(
helper,
vectorizedRemoteComponentCreator{},
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/colflow/vectorized_flow_deadlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colflow_test

import (
"context"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

// TestVectorizedFlowDeadlocksWhenSpilling is a regression test for the
// vectorized flow being deadlocked when multiple operators have to spill to
// disk exhausting the file descriptor limit.
func TestVectorizedFlowDeadlocksWhenSpilling(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "the test is too slow under stressrace")

vecFDsLimit := 8
envutil.TestSetEnv(t, "COCKROACH_VEC_MAX_OPEN_FDS", strconv.Itoa(vecFDsLimit))
serverArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{
// Set the testing knob so that the first operator to spill would
// use up the whole FD limit.
VecFDsToAcquire: vecFDsLimit,
// Allow just one retry to speed up the test.
VecFDsAcquireMaxRetriesCount: 1,
}},
}
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: serverArgs})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, "CREATE TABLE t (a, b) AS SELECT i, i FROM generate_series(1, 10000) AS g(i)")
require.NoError(t, err)
// Lower the workmem budget so that all buffering operators have to spill to
// disk.
_, err = conn.ExecContext(ctx, "SET distsql_workmem = '1KiB'")
require.NoError(t, err)

queryCtx, queryCtxCancel := context.WithDeadline(ctx, timeutil.Now().Add(10*time.Second))
defer queryCtxCancel()
// Run a query with a hash joiner feeding into a hash aggregator, with both
// operators spilling to disk. We expect that the hash aggregator won't be
// able to spill though since the FD limit has been used up, and we'd like
// to see the query timing out (when acquiring the file descriptor quota)
// rather than being canceled due to the context deadline.
query := "SELECT max(a) FROM (SELECT t1.a, t1.b FROM t AS t1 INNER HASH JOIN t AS t2 ON t1.a = t2.b) GROUP BY b"
_, err = conn.ExecContext(queryCtx, query)
// We expect an error that is different from the query cancellation (which
// is what SQL layer returns on a context cancellation).
require.NotNil(t, err)
require.False(t, strings.Contains(err.Error(), cancelchecker.QueryCanceledError.Error()))
}
9 changes: 9 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ type TestingKnobs struct {
// Cannot be set together with ForceDiskSpill.
MemoryLimitBytes int64

// VecFDsToAcquire, if positive, indicates the number of file descriptors
// that should be acquired by a single disk-spilling operator in the
// vectorized engine.
VecFDsToAcquire int
// VecFDsAcquireMaxRetriesCount, if positive, determines the maximum number
// of retries done when acquiring the file descriptors for a disk-spilling
// operator in the vectorized engine.
VecFDsAcquireMaxRetriesCount int

// TableReaderBatchBytesLimit, if not 0, overrides the limit that the
// TableReader will set on the size of results it wants to get for individual
// requests.
Expand Down

0 comments on commit aa14edd

Please sign in to comment.