Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv,storage: add support for child transactions #56588

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ go_test(
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondatapb",
"//pkg/storage/enginepb",
"//pkg/testutils",
Expand All @@ -96,6 +100,7 @@ go_test(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_library(
"//pkg/util/future",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/interval",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
Expand Down
139 changes: 139 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -171,6 +172,79 @@ type TxnCoordSender struct {
// typ specifies whether this transaction is the top level,
// or one of potentially many distributed transactions.
typ kv.TxnType

// parent is used by pushParentUponCommit.
parent ancestorTxn
}

// ancestorTxn is a handle to an ancestor held by a child. The child must call
// refreshAncestor on its immediate parent after committing. This logic is
// important for two reasons:
//
// 1. Parent transactions should never logically precede child transactions
// 2. Child transactions should not invalidate the reads of any of their
// ancestors. If they do, it may lead to infinite retries, which would be
// bad. The interface here accepts a write set of the child in order to
// check whether the child may have done an illegal read and to surface an
// error.
//
// Note that if the childCommitTimestamp is equal to the parent's current
// read timestamp then the write set does not need to be consulted as any
// invalid write would have been pushed by the timestamp cache.
type ancestorTxn interface {
forwardToCommittedDescendant(ctx context.Context, childCommitTimestamp hlc.Timestamp, childWriteSet interval.RangeGroup) error
}

// pushAncestorsUponCommitLocked is called when a transaction moves to
// committed to ensure that no ancestor transaction commits at a timestamp
// which precedes the commit timestamp of any of its descendants.
func (tc *TxnCoordSender) pushParentUponCommitLocked(ctx context.Context) error {
if tc.parent == nil {
return nil
}
writes := interval.NewRangeTree()
for i := range tc.mu.txn.LockSpans {
writes.Add(tc.mu.txn.LockSpans[i].AsRange())
}

// Copy out the InFlightWrites. When a transaction is asynchronously being
// moved to the explicitly committed state, it still contains the
// InFlightWrites.
for i := range tc.mu.txn.InFlightWrites {
writes.Add(roachpb.Span{Key: tc.mu.txn.InFlightWrites[i].Key}.AsRange())
}

return tc.parent.forwardToCommittedDescendant(ctx, tc.mu.txn.WriteTimestamp, writes)
}

// forwardToCommittedDescendant makes a TxnCoordSender implement ancestorTxn.
func (tc *TxnCoordSender) forwardToCommittedDescendant(
ctx context.Context, descendantCommitTimestamp hlc.Timestamp, childWriteSet interval.RangeGroup,
) error {
if tc.parent != nil {
if err := tc.parent.forwardToCommittedDescendant(
ctx, descendantCommitTimestamp, childWriteSet,
); err != nil {
return err
}
}
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.interceptorAlloc.txnSpanRefresher.refreshInvalid {
return errors.New("cannot forward provisional commit timestamp " +
"due to overlapping write")
}
if descendantCommitTimestamp.LessEq(tc.mu.txn.ReadTimestamp) {
return nil
}
for _, readSpan := range tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.asSlice() {
if childWriteSet.Overlaps(readSpan.AsRange()) {
return errors.New("cannot forward provisional commit timestamp " +
"due to overlapping write")
}
}
tc.mu.txn.WriteTimestamp.Forward(descendantCommitTimestamp)
return nil
}

var _ kv.TxnSender = &TxnCoordSender{}
Expand Down Expand Up @@ -465,6 +539,9 @@ func (tc *TxnCoordSender) finalizeNonLockingTxnLocked(
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return kvpb.NewError(err)
}
if err := tc.pushParentUponCommitLocked(ctx); err != nil {
return kvpb.NewError(err)
}
}
return nil
}
Expand Down Expand Up @@ -540,6 +617,10 @@ func (tc *TxnCoordSender) Send(
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return nil, kvpb.NewError(err)
}

if err := tc.pushParentUponCommitLocked(ctx); err != nil {
return nil, kvpb.NewError(err)
}
}
}
}
Expand Down Expand Up @@ -828,6 +909,10 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
case *kvpb.TransactionPushError:
tc.metrics.RestartsTxnPush.Inc()

case *kvpb.AncestorAbortedError:
// TODO(ajwerner): Add a metric
tc.metrics.RestartsUnknown.Inc()

default:
tc.metrics.RestartsUnknown.Inc()
}
Expand Down Expand Up @@ -1427,3 +1512,57 @@ func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
func (tc *TxnCoordSender) hasPerformedWritesLocked() bool {
return tc.mu.txn.Sequence != 0
}

// NewChildTransaction is part of the TxnSender interface.
func (tc *TxnCoordSender) NewChildTransaction() (id uuid.UUID, child kv.TxnSender, _ error) {
if status := tc.TxnStatus(); status != roachpb.PENDING {
return uuid.UUID{}, nil, errors.Errorf(
"illegal call to NewChildTransaction on non-%s (%s) transaction",
roachpb.PENDING, status)
}
if tc.typ != kv.RootTxn {
return uuid.UUID{}, nil, errors.Errorf(
"illegal call to NewChildTransaction on non-root transaction")
}
tc.mu.Lock()
defer tc.mu.Unlock()

if tc.mu.txn.CommitTimestampFixed {
return uuid.UUID{}, nil, errors.Errorf(
"illegal call to NewChildTransaction on fixed commit timestamp transaction")
}

// TODO(ajwerner): Consider stripping information from the parent proto like
// LockSpans or perhaps introducing a new proto altogether for parents. The
// latter is likely to be onerous.

parent := tc.mu.txn.Clone()
childTxn := roachpb.MakeTransaction(
parent.Name+" child",
nil,
tc.mu.userPriority,
parent.WriteTimestamp,
tc.clock.MaxOffset().Nanoseconds(),
tc.mu.txn.CoordinatorNodeID,
)
childTxn.Parent = parent

// Note that it is critical to set the priority value of the child to at
// least that of the parent. If it were allowed to be below that of the
// parent, live-lock may occur.
//
// Imagine a case where two child transactions which have encountered
// intents of parents A1 (2) -> B (4) and B1 (3) -> A (5). In this case,
// neither child has greater priority than any parent. Given the parents are
// not actually pushing anything (they are interpretted to be implicitly
// pushing their children), we'd be in trouble; no pusher would break the
// deadlock. Priority levels should only increase and thus these scenarios
// will not occur so long as the child's priority starts at at least that of
// the parent.
childTxn.Priority = parent.Priority
child = newRootTxnCoordSender(tc.TxnCoordSenderFactory, &childTxn, tc.mu.userPriority)
childTC := child.(*TxnCoordSender)
childTC.parent = tc
return childTxn.ID, child, nil

}
24 changes: 24 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3057,3 +3057,27 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) {
err = rootTxn.Put(ctx, roachpb.Key("a"), []byte("b"))
require.NoError(t, err)
}

// TestNewChildTransaction is a very simplistic sanity check of the child
// transaction code.
func TestNewChildTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s := createTestDBWithKnobs(t, nil /* knobs */)
defer s.Stop()
ctx := context.Background()

parentTxn := s.DB.NewTxn(ctx, "foo")
id, childTxn, err := parentTxn.Sender().NewChildTransaction()
require.NoError(t, err)
child := childTxn.TestingCloneTxn()
require.Equal(t, id, child.ID)

parent := parentTxn.Sender().TestingCloneTxn()
require.Equal(t, parent.Priority, child.Priority)
require.NotNil(t, child.Parent)
require.Equal(t, parent.ID, child.Parent.ID)
require.Equal(t, parent.Name+" child", child.Name)
require.NotEqual(t, parent.ID, child.ID)
require.Nil(t, child.Key)
}
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/batch_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func PrepareTransactionForRetry(
now := clock.Now()
txn.WriteTimestamp.Forward(now)
}
case *AncestorAbortedError:

case *WriteTooOldError:
// Increase the timestamp to the ts at which we've actually written.
txn.WriteTimestamp.Forward(tErr.RetryTimestamp())
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvpb/errordetailtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion pkg/kv/kvpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ const (
MinTimestampBoundUnsatisfiableErrType ErrorDetailType = 42
RefreshFailedErrType ErrorDetailType = 43
MVCCHistoryMutationErrType ErrorDetailType = 44
AncestorAbortedErrType ErrorDetailType = 45
// When adding new error types, don't forget to update NumErrors below.

// CommunicationErrType indicates a gRPC error; this is not an ErrorDetail.
Expand All @@ -304,7 +305,7 @@ const (
// detail. The value 25 is chosen because it's reserved in the errors proto.
InternalErrType ErrorDetailType = 25

NumErrors int = 45
NumErrors int = 46
)

// Register the migration of all errors that used to be in the roachpb package
Expand Down Expand Up @@ -1536,6 +1537,19 @@ func NewNotLeaseHolderErrorWithSpeculativeLease(
return NewNotLeaseHolderError(speculativeLease, proposerStoreID, rangeDesc, msg)
}

func (e *AncestorAbortedError) Type() ErrorDetailType {
return AncestorAbortedErrType
}

func (e *AncestorAbortedError) Error() string {
return redact.Sprint(e).StripMarkers()
}

func (e *AncestorAbortedError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("ancestor transaction %v aborted", e.AncestorTxn)
return nil
}

var _ errors.SafeFormatter = &NotLeaseHolderError{}
var _ errors.SafeFormatter = &RangeNotFoundError{}
var _ errors.SafeFormatter = &RangeKeyMismatchError{}
Expand Down Expand Up @@ -1570,3 +1584,4 @@ var _ errors.SafeFormatter = &MinTimestampBoundUnsatisfiableError{}
var _ errors.SafeFormatter = &RefreshFailedError{}
var _ errors.SafeFormatter = &MVCCHistoryMutationError{}
var _ errors.SafeFormatter = &UnhandledRetryableError{}
var _ errors.SafeFormatter = &AncestorAbortedError{}
11 changes: 11 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,16 @@ message RefreshFailedError {
optional util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false];
}

// AncestorAbortedError indicates that the parent transaction of a child has been
// aborted while the child is waiting to push another transaction. There is no
// reason for the child to proceed if the parent is already aborted. We cannot
// just use a TransactionAbortedError as the TxnCoordSender would translate
// that into a TransactionRetryWithProtoRefreshError which is not the desired
// behavior.
message AncestorAbortedError {
optional roachpb.Transaction ancestor_txn = 1 [(gogoproto.nullable) = false];
}

// ErrorDetail is a union type containing all available errors.
message ErrorDetail {
reserved 15, 19, 20, 21, 22, 23, 24, 25, 29, 30, 33;
Expand Down Expand Up @@ -661,6 +671,7 @@ message ErrorDetail {
RefreshFailedError refresh_failed_error = 43;
MVCCHistoryMutationError mvcc_history_mutation = 44
[(gogoproto.customname) = "MVCCHistoryMutation"];
AncestorAbortedError ancestor_aborted = 45;
}
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,11 @@ type lockTable interface {
// lockTableGuard and the subsequent calls reuse the previously returned
// one. The latches needed by the request must be held when calling this
// function.
ScanAndEnqueue(Request, lockTableGuard) lockTableGuard
//
// The returned error indicates illegal operations by descendant transactions
// interacting with their ancestors' locks. It is illegal for a descendant to
// try to acquire a lock that overlaps with any of its descendants.
ScanAndEnqueue(Request, lockTableGuard) (lockTableGuard, error)

// ScanOptimistic takes a snapshot of the lock table for later checking for
// conflicts, and returns a guard. It is for optimistic evaluation of
Expand Down Expand Up @@ -749,8 +753,10 @@ type lockTableGuard interface {
// state transition.
NewStateChan() chan struct{}

// CurState returns the latest waiting state.
CurState() waitingState
// CurState returns the latest waiting state. It returns an error if the
// lock which was encountered is not allowed as it is due to an ancestor
// transaction.
CurState() (waitingState, *Error)

// ResolveBeforeScanning lists the locks to resolve before scanning again.
// This must be called after:
Expand Down
Loading