Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: improve intent cleanup for disconnected clients #64869

Merged
merged 2 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions pkg/kv/kvserver/intent_resolver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,29 @@ package kvserver

import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -204,3 +213,218 @@ func TestRollbackSyncRangedIntentResolution(t *testing.T) {
require.NoError(t, txn.Rollback(ctx))
require.NoError(t, ctx.Err())
}

// Tests that intents and transaction records are cleaned up within a reasonable
// timeframe in various scenarios.
func TestReliableIntentCleanup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t)
skip.UnderStress(t)

testutils.RunTrueAndFalse(t, "ForceSyncIntentResolution", func(t *testing.T, sync bool) {
ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{
ForceSyncIntentResolution: sync,
},
},
},
},
}
tc := serverutils.StartNewTestCluster(t, 3, clusterArgs)
defer tc.Stopper().Stop(ctx)

srv := tc.Server(0)
db := srv.DB()
store, err := srv.GetStores().(*Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
engine := store.Engine()
clock := srv.Clock()

// Set up a key prefix, and split off 16 ranges by the first hex digit (4
// bits) following the prefix: key\x00\x00 key\x00\x10 key\x00\x20 ...
prefix := roachpb.Key([]byte("key\x00"))
for i := 0; i < 16; i++ {
require.NoError(t, db.AdminSplit(ctx, append(prefix, byte(i<<4)), hlc.MaxTimestamp))
}
require.NoError(t, tc.WaitForFullReplication())

// Set up random key generator which only generates unique keys.
genKeySeen := map[string]bool{}
genKey := func(singleRange bool) roachpb.Key {
key := make([]byte, len(prefix)+4)
copy(key, prefix)
for {
r := rand.Uint32()
if singleRange {
r = r >> 4 // zero out four first bits, puts key in first range
}
binary.BigEndian.PutUint32(key[len(prefix):], r)
if !genKeySeen[string(key)] {
genKeySeen[string(key)] = true
return key
}
}
}

// assertIntentCleanup checks that intents get cleaned up within a
// reasonable time.
assertIntentCleanup := func(t *testing.T) {
t.Helper()
var result storage.MVCCScanResult
if !assert.Eventually(t, func() bool {
result, err = storage.MVCCScan(ctx, engine, prefix, prefix.PrefixEnd(),
hlc.MaxTimestamp, storage.MVCCScanOptions{Inconsistent: true})
require.NoError(t, err)
return len(result.Intents) == 0
}, 10*time.Second, 100*time.Millisecond, "intent cleanup timed out") {
require.Fail(t, "found stale intents", "%v", len(result.Intents))
}
}

// assertTxnCleanup checks that the txn record is cleaned up within a
// reasonable time.
assertTxnCleanup := func(t *testing.T, txnKey roachpb.Key, txnID uuid.UUID) {
t.Helper()
var txnEntry roachpb.Transaction
if !assert.Eventually(t, func() bool {
key := keys.TransactionKey(txnKey, txnID)
ok, err := storage.MVCCGetProto(ctx, engine, key, hlc.MaxTimestamp, &txnEntry,
storage.MVCCGetOptions{})
require.NoError(t, err)
return !ok
}, 5*time.Second, 100*time.Millisecond, "txn entry cleanup timed out") {
require.Fail(t, "found stale txn entry", "%v", txnEntry)
}
}

// removeKeys cleans up all entries in the key range.
removeKeys := func(t *testing.T) {
t.Helper()
batch := &kv.Batch{}
batch.AddRawRequest(&roachpb.ClearRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: prefix,
EndKey: prefix.PrefixEnd(),
},
})
require.NoError(t, db.Run(ctx, batch))
genKeySeen = map[string]bool{} // reset random key generator
}

// testTxn runs an intent cleanup test using a transaction.
type testTxnSpec struct {
numKeys int // number of keys per transaction
singleRange bool // if true, put intents in a single range at key\x00\x00
finalize string // commit, rollback, cancel, abort (via push)
}
testTxn := func(t *testing.T, spec testTxnSpec) {
t.Helper()
t.Cleanup(func() { removeKeys(t) })
const batchSize = 10000

// Write numKeys KV pairs in batches of batchSize as a single txn.
var txnKey roachpb.Key
txn := db.NewTxn(ctx, "test")
batch := txn.NewBatch()
for i := 0; i < spec.numKeys; i++ {
key := genKey(spec.singleRange)
batch.Put(key, []byte("value"))
if (i > 0 && i%batchSize == 0) || i == spec.numKeys-1 {
require.NoError(t, txn.Run(ctx, batch))
batch = txn.NewBatch()
}
if i == 0 {
txnKey = make([]byte, len(key))
copy(txnKey, key)
}
}

// Finalize the txn according to the spec.
switch spec.finalize {
case "commit":
require.NoError(t, txn.Commit(ctx))

case "rollback":
require.NoError(t, txn.Rollback(ctx))

case "cancel":
rollbackCtx, cancel := context.WithCancel(ctx)
cancel()
if err := txn.Rollback(rollbackCtx); err != context.Canceled {
require.NoError(t, err)
}

case "abort":
now := clock.NowAsClockTimestamp()
pusherProto := roachpb.MakeTransaction(
"pusher",
nil, // baseKey
roachpb.MaxUserPriority,
now.ToTimestamp(),
clock.MaxOffset().Nanoseconds(),
)
pusher := kv.NewTxnFromProto(ctx, db, srv.NodeID(), now, kv.RootTxn, &pusherProto)
require.NoError(t, pusher.Put(ctx, txnKey, []byte("pushit")))

err := txn.Commit(ctx)
require.Error(t, err)
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
require.True(t, err.(*roachpb.TransactionRetryWithProtoRefreshError).PrevTxnAborted())
require.NoError(t, pusher.Rollback(ctx))

default:
require.Fail(t, "invalid finalize value %q", spec.finalize)
}

assertIntentCleanup(t)
assertTxnCleanup(t, txnKey, txn.ID())
}

// testNonTxn runs an intent cleanup test without an explicit transaction.
type testNonTxnSpec struct {
numKeys int // number of keys per transaction
singleRange bool // if true, put intents in a single range at key\x00\x00
}
testNonTxn := func(t *testing.T, spec testNonTxnSpec) {
t.Helper()
t.Cleanup(func() { removeKeys(t) })

batch := &kv.Batch{}
for i := 0; i < spec.numKeys; i++ {
batch.Put(genKey(spec.singleRange), []byte("value"))
}
require.NoError(t, db.Run(ctx, batch))

assertIntentCleanup(t)
}

testutils.RunValues(t, "numKeys", []interface{}{1, 100, 100000}, func(t *testing.T, numKeys interface{}) {
testutils.RunTrueAndFalse(t, "singleRange", func(t *testing.T, singleRange bool) {
testutils.RunTrueAndFalse(t, "txn", func(t *testing.T, txn bool) {
if txn {
finalize := []interface{}{"commit", "rollback", "cancel", "abort"}
testutils.RunValues(t, "finalize", finalize, func(t *testing.T, finalize interface{}) {
testTxn(t, testTxnSpec{
numKeys: numKeys.(int),
singleRange: singleRange,
finalize: finalize.(string),
})
})
} else {
testNonTxn(t, testNonTxnSpec{
numKeys: numKeys.(int),
singleRange: singleRange,
})
}
})
})
})
})
}
18 changes: 15 additions & 3 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ const (
// intentResolutionBatchIdle is similar to the above setting but is used when
// when no additional traffic hits the batch.
defaultIntentResolutionBatchIdle = 5 * time.Millisecond

// gcTxnRecordTimeout is the timeout for asynchronous txn record removal
// during cleanupFinishedTxnIntents.
gcTxnRecordTimeout = 20 * time.Second
)

// Config contains the dependencies to construct an IntentResolver.
Expand Down Expand Up @@ -754,12 +758,20 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents(
if pErr := ir.ResolveIntents(ctx, txn.LocksAsLockUpdates(), opts); pErr != nil {
return errors.Wrapf(pErr.GoError(), "failed to resolve intents")
}
// Run transaction record GC outside of ir.sem.
// Run transaction record GC outside of ir.sem. We need a new context, in case
// we're still connected to the client's context (which can happen when
// allowSyncProcessing is true). Otherwise, we may return to the caller before
// gcTxnRecord completes, which may cancel the context and abort the cleanup
// either due to a defer cancel() or client disconnection. We give it a timeout
// as well, to avoid goroutine leakage.
return ir.stopper.RunAsyncTask(
ctx,
ir.ambientCtx.AnnotateCtx(context.Background()),
"storage.IntentResolver: cleanup txn records",
func(ctx context.Context) {
err := ir.gcTxnRecord(ctx, rangeID, txn)
err := contextutil.RunWithTimeout(ctx, "cleanup txn record",
gcTxnRecordTimeout, func(ctx context.Context) error {
return ir.gcTxnRecord(ctx, rangeID, txn)
})
if onComplete != nil {
onComplete(err)
}
Expand Down
54 changes: 42 additions & 12 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -208,22 +209,18 @@ func (r *Replica) executeWriteBatch(
// resolution is semi-synchronous in that there is a limited number of
// outstanding asynchronous resolution tasks allowed after which
// further calls will block.
if len(propResult.EncounteredIntents) > 0 {
// TODO(peter): Re-proposed and canceled (but executed) commands can
// both leave intents to GC that don't hit this code path. No good
// solution presents itself at the moment and such intents will be
// resolved on reads.
if err := r.store.intentResolver.CleanupIntentsAsync(
ctx, propResult.EncounteredIntents, true, /* allowSync */
); err != nil {
log.Warningf(ctx, "%v", err)
}
}
if len(propResult.EndTxns) > 0 {
if err := r.store.intentResolver.CleanupTxnIntentsAsync(
ctx, r.RangeID, propResult.EndTxns, true, /* allowSync */
); err != nil {
log.Warningf(ctx, "%v", err)
log.Warningf(ctx, "transaction cleanup failed: %v", err)
}
}
if len(propResult.EncounteredIntents) > 0 {
if err := r.store.intentResolver.CleanupIntentsAsync(
ctx, propResult.EncounteredIntents, true, /* allowSync */
); err != nil {
log.Warningf(ctx, "intent cleanup failed: %v", err)
}
}
if ba.Requests[0].GetMigrate() != nil && propResult.Err == nil {
Expand Down Expand Up @@ -280,6 +277,7 @@ func (r *Replica) executeWriteBatch(
propResult.Err = roachpb.NewError(applicationErr)
}
return propResult.Reply, nil, propResult.Err

case <-slowTimer.C:
slowTimer.Read = true
r.store.metrics.SlowRaftRequests.Inc(1)
Expand All @@ -288,13 +286,45 @@ func (r *Replica) executeWriteBatch(
rangeUnavailableMessage(&s, r.Desc(), r.store.cfg.NodeLiveness.GetIsLiveMap(),
r.RaftStatus(), ba, timeutil.Since(startPropTime))
log.Errorf(ctx, "range unavailable: %v", s)

case <-ctxDone:
// If our context was canceled, return an AmbiguousResultError,
// which indicates to the caller that the command may have executed.
//
// If the batch contained an EndTxnRequest, asynchronously wait
// around for the result for a while and try to clean up after the
// txn. If the resolver's async task pool is full, just skip cleanup
// by setting allowSync=false, since we won't be able to
// backpressure clients.
if _, ok := ba.GetArg(roachpb.EndTxn); ok {
const taskName = "async txn cleanup"
_ = r.store.stopper.RunAsyncTask(
r.AnnotateCtx(context.Background()),
taskName,
func(ctx context.Context) {
err := contextutil.RunWithTimeout(ctx, taskName, 20*time.Second,
func(ctx context.Context) error {
select {
case propResult := <-ch:
if len(propResult.EndTxns) > 0 {
return r.store.intentResolver.CleanupTxnIntentsAsync(ctx,
r.RangeID, propResult.EndTxns, false /* allowSync */)
}
case <-shouldQuiesce:
case <-ctx.Done():
}
return nil
})
if err != nil {
log.Warningf(ctx, "transaction cleanup failed: %v", err)
}
})
}
abandon()
log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s",
timeutil.Since(startTime).Seconds(), ba)
return nil, nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error()))

case <-shouldQuiesce:
// If shutting down, return an AmbiguousResultError, which indicates
// to the caller that the command may have executed.
Expand Down
Loading