From 3575dea3e6b084a63a1432672705f949572bc211 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 7 May 2021 11:08:37 +0000 Subject: [PATCH 1/2] kvserver: fix race condition during synchronous txn record cleanup Transaction records and intents are normally cleaned up asynchronously via `IntentResolver.CleanupTxnIntentsAsync()`, separately from the client's context. When the async task limit is exceeded, cleanup instead becomes synchronous and attached to the client context. However, the final `gcTxnRecord()` call to remove the transaction record is asynchronous even when intent cleanup is synchronous, to avoid holding onto the intent resolver task slot. This call will typically return to the caller before `gcTxnRecord()` completes, which may cause the caller to cancel the context (either via `defer cancel()` or a client disconnection) and in turn cancel the `gcTxnRecord()` call. This patch gives the async `gcTxnRecord()` call a separate background context that's independent of the client's context even in the synchronous case, with a 20 second timeout to avoid goroutine leaks. Release note (bug fix): Fixed a race condition during transaction cleanup that could leave old transaction records behind until MVCC garbage collection. --- .../kvserver/intentresolver/intent_resolver.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/intentresolver/intent_resolver.go b/pkg/kv/kvserver/intentresolver/intent_resolver.go index 89325f9a46c8..e36b816cca0c 100644 --- a/pkg/kv/kvserver/intentresolver/intent_resolver.go +++ b/pkg/kv/kvserver/intentresolver/intent_resolver.go @@ -99,6 +99,10 @@ const ( // intentResolutionBatchIdle is similar to the above setting but is used when // when no additional traffic hits the batch. defaultIntentResolutionBatchIdle = 5 * time.Millisecond + + // gcTxnRecordTimeout is the timeout for asynchronous txn record removal + // during cleanupFinishedTxnIntents. + gcTxnRecordTimeout = 20 * time.Second ) // Config contains the dependencies to construct an IntentResolver. @@ -754,12 +758,20 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents( if pErr := ir.ResolveIntents(ctx, txn.LocksAsLockUpdates(), opts); pErr != nil { return errors.Wrapf(pErr.GoError(), "failed to resolve intents") } - // Run transaction record GC outside of ir.sem. + // Run transaction record GC outside of ir.sem. We need a new context, in case + // we're still connected to the client's context (which can happen when + // allowSyncProcessing is true). Otherwise, we may return to the caller before + // gcTxnRecord completes, which may cancel the context and abort the cleanup + // either due to a defer cancel() or client disconnection. We give it a timeout + // as well, to avoid goroutine leakage. return ir.stopper.RunAsyncTask( - ctx, + ir.ambientCtx.AnnotateCtx(context.Background()), "storage.IntentResolver: cleanup txn records", func(ctx context.Context) { - err := ir.gcTxnRecord(ctx, rangeID, txn) + err := contextutil.RunWithTimeout(ctx, "cleanup txn record", + gcTxnRecordTimeout, func(ctx context.Context) error { + return ir.gcTxnRecord(ctx, rangeID, txn) + }) if onComplete != nil { onComplete(err) } From 7a24ada797cb06d66c1ae1ecbf04852139a22f9d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 5 May 2021 16:39:04 +0000 Subject: [PATCH 2/2] kvserver: improve intent cleanup for disconnected clients Whenever a client disconnects during an open transaction or in-flight DML statement, the server-side session will asynchronously clean up the transaction record and intents by rolling back the transaction. However, this had a very short timeout of 3 seconds, and even though the actual `IntentResolver.CleanupTxnIntentsAsync()` cleanup task runs asynchronously and independently of the client's context in the typical case, the short timeout could cause transaction cleanup to be cancelled if the `EndTxn` request took more than 3 seconds to get all the way through Raft execution or if the async task limit was exceeded such that the cleanup task kept using the client's context. This in turn could lead to intents building up over time. This patch increases the timeout when rolling back transactions for disconnected clients to 1 minute, and also tries to perform transaction cleanup when a client disconnects while waiting for an `EndTxn` command to go through Raft. Release note (bug fix): improved transaction cleanup for disconnected clients, to reduce intent buildup. --- .../intent_resolver_integration_test.go | 224 ++++++++++++++++++ pkg/kv/kvserver/replica_write.go | 54 ++++- pkg/kv/txn.go | 63 +++-- pkg/testutils/subtest.go | 11 + 4 files changed, 316 insertions(+), 36 deletions(-) diff --git a/pkg/kv/kvserver/intent_resolver_integration_test.go b/pkg/kv/kvserver/intent_resolver_integration_test.go index 75229f884577..0d8b4c97b61c 100644 --- a/pkg/kv/kvserver/intent_resolver_integration_test.go +++ b/pkg/kv/kvserver/intent_resolver_integration_test.go @@ -12,20 +12,29 @@ package kvserver import ( "context" + "encoding/binary" "fmt" + "math/rand" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -204,3 +213,218 @@ func TestRollbackSyncRangedIntentResolution(t *testing.T) { require.NoError(t, txn.Rollback(ctx)) require.NoError(t, ctx.Err()) } + +// Tests that intents and transaction records are cleaned up within a reasonable +// timeframe in various scenarios. +func TestReliableIntentCleanup(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t) + skip.UnderStress(t) + + testutils.RunTrueAndFalse(t, "ForceSyncIntentResolution", func(t *testing.T, sync bool) { + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &StoreTestingKnobs{ + IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{ + ForceSyncIntentResolution: sync, + }, + }, + }, + }, + } + tc := serverutils.StartNewTestCluster(t, 3, clusterArgs) + defer tc.Stopper().Stop(ctx) + + srv := tc.Server(0) + db := srv.DB() + store, err := srv.GetStores().(*Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + engine := store.Engine() + clock := srv.Clock() + + // Set up a key prefix, and split off 16 ranges by the first hex digit (4 + // bits) following the prefix: key\x00\x00 key\x00\x10 key\x00\x20 ... + prefix := roachpb.Key([]byte("key\x00")) + for i := 0; i < 16; i++ { + require.NoError(t, db.AdminSplit(ctx, append(prefix, byte(i<<4)), hlc.MaxTimestamp)) + } + require.NoError(t, tc.WaitForFullReplication()) + + // Set up random key generator which only generates unique keys. + genKeySeen := map[string]bool{} + genKey := func(singleRange bool) roachpb.Key { + key := make([]byte, len(prefix)+4) + copy(key, prefix) + for { + r := rand.Uint32() + if singleRange { + r = r >> 4 // zero out four first bits, puts key in first range + } + binary.BigEndian.PutUint32(key[len(prefix):], r) + if !genKeySeen[string(key)] { + genKeySeen[string(key)] = true + return key + } + } + } + + // assertIntentCleanup checks that intents get cleaned up within a + // reasonable time. + assertIntentCleanup := func(t *testing.T) { + t.Helper() + var result storage.MVCCScanResult + if !assert.Eventually(t, func() bool { + result, err = storage.MVCCScan(ctx, engine, prefix, prefix.PrefixEnd(), + hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true}) + require.NoError(t, err) + return len(result.Intents) == 0 + }, 10*time.Second, 100*time.Millisecond, "intent cleanup timed out") { + require.Fail(t, "found stale intents", "%v", len(result.Intents)) + } + } + + // assertTxnCleanup checks that the txn record is cleaned up within a + // reasonable time. + assertTxnCleanup := func(t *testing.T, txnKey roachpb.Key, txnID uuid.UUID) { + t.Helper() + var txnEntry roachpb.Transaction + if !assert.Eventually(t, func() bool { + key := keys.TransactionKey(txnKey, txnID) + ok, err := storage.MVCCGetProto(ctx, engine, key, hlc.MaxTimestamp, &txnEntry, + storage.MVCCGetOptions{}) + require.NoError(t, err) + return !ok + }, 5*time.Second, 100*time.Millisecond, "txn entry cleanup timed out") { + require.Fail(t, "found stale txn entry", "%v", txnEntry) + } + } + + // removeKeys cleans up all entries in the key range. + removeKeys := func(t *testing.T) { + t.Helper() + batch := &kv.Batch{} + batch.AddRawRequest(&roachpb.ClearRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: prefix, + EndKey: prefix.PrefixEnd(), + }, + }) + require.NoError(t, db.Run(ctx, batch)) + genKeySeen = map[string]bool{} // reset random key generator + } + + // testTxn runs an intent cleanup test using a transaction. + type testTxnSpec struct { + numKeys int // number of keys per transaction + singleRange bool // if true, put intents in a single range at key\x00\x00 + finalize string // commit, rollback, cancel, abort (via push) + } + testTxn := func(t *testing.T, spec testTxnSpec) { + t.Helper() + t.Cleanup(func() { removeKeys(t) }) + const batchSize = 10000 + + // Write numKeys KV pairs in batches of batchSize as a single txn. + var txnKey roachpb.Key + txn := db.NewTxn(ctx, "test") + batch := txn.NewBatch() + for i := 0; i < spec.numKeys; i++ { + key := genKey(spec.singleRange) + batch.Put(key, []byte("value")) + if (i > 0 && i%batchSize == 0) || i == spec.numKeys-1 { + require.NoError(t, txn.Run(ctx, batch)) + batch = txn.NewBatch() + } + if i == 0 { + txnKey = make([]byte, len(key)) + copy(txnKey, key) + } + } + + // Finalize the txn according to the spec. + switch spec.finalize { + case "commit": + require.NoError(t, txn.Commit(ctx)) + + case "rollback": + require.NoError(t, txn.Rollback(ctx)) + + case "cancel": + rollbackCtx, cancel := context.WithCancel(ctx) + cancel() + if err := txn.Rollback(rollbackCtx); err != context.Canceled { + require.NoError(t, err) + } + + case "abort": + now := clock.NowAsClockTimestamp() + pusherProto := roachpb.MakeTransaction( + "pusher", + nil, // baseKey + roachpb.MaxUserPriority, + now.ToTimestamp(), + clock.MaxOffset().Nanoseconds(), + ) + pusher := kv.NewTxnFromProto(ctx, db, srv.NodeID(), now, kv.RootTxn, &pusherProto) + require.NoError(t, pusher.Put(ctx, txnKey, []byte("pushit"))) + + err := txn.Commit(ctx) + require.Error(t, err) + require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err) + require.True(t, err.(*roachpb.TransactionRetryWithProtoRefreshError).PrevTxnAborted()) + require.NoError(t, pusher.Rollback(ctx)) + + default: + require.Fail(t, "invalid finalize value %q", spec.finalize) + } + + assertIntentCleanup(t) + assertTxnCleanup(t, txnKey, txn.ID()) + } + + // testNonTxn runs an intent cleanup test without an explicit transaction. + type testNonTxnSpec struct { + numKeys int // number of keys per transaction + singleRange bool // if true, put intents in a single range at key\x00\x00 + } + testNonTxn := func(t *testing.T, spec testNonTxnSpec) { + t.Helper() + t.Cleanup(func() { removeKeys(t) }) + + batch := &kv.Batch{} + for i := 0; i < spec.numKeys; i++ { + batch.Put(genKey(spec.singleRange), []byte("value")) + } + require.NoError(t, db.Run(ctx, batch)) + + assertIntentCleanup(t) + } + + testutils.RunValues(t, "numKeys", []interface{}{1, 100, 100000}, func(t *testing.T, numKeys interface{}) { + testutils.RunTrueAndFalse(t, "singleRange", func(t *testing.T, singleRange bool) { + testutils.RunTrueAndFalse(t, "txn", func(t *testing.T, txn bool) { + if txn { + finalize := []interface{}{"commit", "rollback", "cancel", "abort"} + testutils.RunValues(t, "finalize", finalize, func(t *testing.T, finalize interface{}) { + testTxn(t, testTxnSpec{ + numKeys: numKeys.(int), + singleRange: singleRange, + finalize: finalize.(string), + }) + }) + } else { + testNonTxn(t, testNonTxnSpec{ + numKeys: numKeys.(int), + singleRange: singleRange, + }) + } + }) + }) + }) + }) +} diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 27bb683a185b..b12e1e6d578e 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -208,22 +209,18 @@ func (r *Replica) executeWriteBatch( // resolution is semi-synchronous in that there is a limited number of // outstanding asynchronous resolution tasks allowed after which // further calls will block. - if len(propResult.EncounteredIntents) > 0 { - // TODO(peter): Re-proposed and canceled (but executed) commands can - // both leave intents to GC that don't hit this code path. No good - // solution presents itself at the moment and such intents will be - // resolved on reads. - if err := r.store.intentResolver.CleanupIntentsAsync( - ctx, propResult.EncounteredIntents, true, /* allowSync */ - ); err != nil { - log.Warningf(ctx, "%v", err) - } - } if len(propResult.EndTxns) > 0 { if err := r.store.intentResolver.CleanupTxnIntentsAsync( ctx, r.RangeID, propResult.EndTxns, true, /* allowSync */ ); err != nil { - log.Warningf(ctx, "%v", err) + log.Warningf(ctx, "transaction cleanup failed: %v", err) + } + } + if len(propResult.EncounteredIntents) > 0 { + if err := r.store.intentResolver.CleanupIntentsAsync( + ctx, propResult.EncounteredIntents, true, /* allowSync */ + ); err != nil { + log.Warningf(ctx, "intent cleanup failed: %v", err) } } if ba.Requests[0].GetMigrate() != nil && propResult.Err == nil { @@ -280,6 +277,7 @@ func (r *Replica) executeWriteBatch( propResult.Err = roachpb.NewError(applicationErr) } return propResult.Reply, nil, propResult.Err + case <-slowTimer.C: slowTimer.Read = true r.store.metrics.SlowRaftRequests.Inc(1) @@ -288,13 +286,45 @@ func (r *Replica) executeWriteBatch( rangeUnavailableMessage(&s, r.Desc(), r.store.cfg.NodeLiveness.GetIsLiveMap(), r.RaftStatus(), ba, timeutil.Since(startPropTime)) log.Errorf(ctx, "range unavailable: %v", s) + case <-ctxDone: // If our context was canceled, return an AmbiguousResultError, // which indicates to the caller that the command may have executed. + // + // If the batch contained an EndTxnRequest, asynchronously wait + // around for the result for a while and try to clean up after the + // txn. If the resolver's async task pool is full, just skip cleanup + // by setting allowSync=false, since we won't be able to + // backpressure clients. + if _, ok := ba.GetArg(roachpb.EndTxn); ok { + const taskName = "async txn cleanup" + _ = r.store.stopper.RunAsyncTask( + r.AnnotateCtx(context.Background()), + taskName, + func(ctx context.Context) { + err := contextutil.RunWithTimeout(ctx, taskName, 20*time.Second, + func(ctx context.Context) error { + select { + case propResult := <-ch: + if len(propResult.EndTxns) > 0 { + return r.store.intentResolver.CleanupTxnIntentsAsync(ctx, + r.RangeID, propResult.EndTxns, false /* allowSync */) + } + case <-shouldQuiesce: + case <-ctx.Done(): + } + return nil + }) + if err != nil { + log.Warningf(ctx, "transaction cleanup failed: %v", err) + } + }) + } abandon() log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) + case <-shouldQuiesce: // If shutting down, return an AmbiguousResultError, which indicates // to the caller that the command may have executed. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 8d10abf7be8d..601667c8aca0 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -26,6 +26,19 @@ import ( "github.com/cockroachdb/errors" ) +// asyncRollbackTimeout is the context timeout during rollback() for a client +// who has already disconnected. This is needed to asynchronously clean up the +// client's intents and txn record. If the intent resolver has spare async task +// capacity, this timeout only needs to be long enough for the EndTxn request to +// make it through Raft, but if the cleanup task is synchronous (to backpressure +// clients) then cleanup will be abandoned when the timeout expires. +// +// We generally want to clean up if possible, so we set it high at 1 minute. If +// the transaction is very large or cleanup is very costly (e.g. hits a slow +// path for some reason), and the async pool is full (i.e. the system is +// under load), then it makes sense to abandon the cleanup before too long. +const asyncRollbackTimeout = time.Minute + // Txn is an in-progress distributed database transaction. A Txn is safe for // concurrent use by multiple goroutines. type Txn struct { @@ -727,48 +740,50 @@ func (txn *Txn) Rollback(ctx context.Context) error { func (txn *Txn) rollback(ctx context.Context) *roachpb.Error { log.VEventf(ctx, 2, "rolling back transaction") - sync := true - if ctx.Err() != nil { - sync = false - } - if sync { + // If the client has already disconnected, fall back to asynchronous cleanup + // below. Note that this is the common path when a client disconnects in the + // middle of an open transaction or during statement execution. + if ctx.Err() == nil { var ba roachpb.BatchRequest ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */)) _, pErr := txn.Send(ctx, ba) if pErr == nil { return nil } - // If ctx has been canceled, assume that caused the error and try again - // async below. + // If rollback errored and the ctx was canceled during rollback, assume + // ctx cancellation caused the error and try again async below. if ctx.Err() == nil { return pErr } } - // We don't have a client whose context we can attach to, but we do want to limit how - // long this request is going to be around or it could leak a goroutine (in case of a - // long-lived network partition). + // We don't have a client whose context we can attach to, but we do want to + // limit how long this request is going to be around for to avoid leaking a + // goroutine (in case of a long-lived network partition). If it gets through + // Raft, and the intent resolver has free async task capacity, the actual + // cleanup will be independent of this context. stopper := txn.db.ctx.Stopper ctx, cancel := stopper.WithCancelOnQuiesce(txn.db.AnnotateCtx(context.Background())) if err := stopper.RunAsyncTask(ctx, "async-rollback", func(ctx context.Context) { defer cancel() var ba roachpb.BatchRequest ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */)) - _ = contextutil.RunWithTimeout(ctx, "async txn rollback", 3*time.Second, func(ctx context.Context) error { - if _, pErr := txn.Send(ctx, ba); pErr != nil { - if statusErr, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok && - statusErr.Reason == roachpb.TransactionStatusError_REASON_TXN_COMMITTED { - // A common cause of these async rollbacks failing is when they're - // triggered by a ctx canceled while a commit is in-flight (and it's too - // late for it to be canceled), and so the rollback finds the txn to be - // already committed. We don't spam the logs with those. - log.VEventf(ctx, 2, "async rollback failed: %s", pErr) - } else { - log.Infof(ctx, "async rollback failed: %s", pErr) + _ = contextutil.RunWithTimeout(ctx, "async txn rollback", asyncRollbackTimeout, + func(ctx context.Context) error { + if _, pErr := txn.Send(ctx, ba); pErr != nil { + if statusErr, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok && + statusErr.Reason == roachpb.TransactionStatusError_REASON_TXN_COMMITTED { + // A common cause of these async rollbacks failing is when they're + // triggered by a ctx canceled while a commit is in-flight (and it's too + // late for it to be canceled), and so the rollback finds the txn to be + // already committed. We don't spam the logs with those. + log.VEventf(ctx, 2, "async rollback failed: %s", pErr) + } else { + log.Infof(ctx, "async rollback failed: %s", pErr) + } } - } - return nil - }) + return nil + }) }); err != nil { cancel() return roachpb.NewError(err) diff --git a/pkg/testutils/subtest.go b/pkg/testutils/subtest.go index 6c0f0dcbe9c2..de647db8aa4b 100644 --- a/pkg/testutils/subtest.go +++ b/pkg/testutils/subtest.go @@ -24,3 +24,14 @@ func RunTrueAndFalse(t *testing.T, name string, fn func(t *testing.T, b bool)) { }) } } + +// RunValues calls the provided function in a subtest for each of the +// provided values. +func RunValues(t *testing.T, name string, values []interface{}, fn func(*testing.T, interface{})) { + t.Helper() + for _, v := range values { + t.Run(fmt.Sprintf("%s=%v", name, v), func(t *testing.T) { + fn(t, v) + }) + } +}