Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: stop using BeginTransaction requests #33566

Merged
merged 1 commit into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/RFCS/20181209_lazy_txn_record_creation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- Feature Name: Lazy Transaction Record Creation (a.k.a Deprecate BeginTransaction)
- Status: in-progress
- Status: completed
- Start Date: 2018-12-09
- Authors: Nathan VanBenschoten
- RFC PR: #32971
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-3</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-4</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
tcs.interceptorAlloc.txnHeartbeat.init(
&tcs.mu.Mutex,
&tcs.mu.txn,
tcf.st,
tcs.clock,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
Expand Down Expand Up @@ -581,7 +582,7 @@ func (tc *TxnCoordSender) Send(
return nil, pErr
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnHeartbeat.mu.everSentBeginTxn {
if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnHeartbeat.mu.everWroteIntents {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
}

Expand Down
49 changes: 23 additions & 26 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -1263,13 +1264,11 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
txn := ba.Txn.Clone()
br.Txn = &txn

if _, hasBT := ba.GetArg(roachpb.BeginTransaction); hasBT {
if _, ok := ba.Requests[1].GetInner().(*roachpb.PutRequest); !ok {
if _, hasPut := ba.GetArg(roachpb.Put); hasPut {
if _, ok := ba.Requests[0].GetInner().(*roachpb.PutRequest); !ok {
t.Fatalf("expected Put")
}
union := &br.Responses[0] // avoid operating on copy
union.MustSetInner(&roachpb.BeginTransactionResponse{})
union = &br.Responses[1] // avoid operating on copy
union.MustSetInner(&roachpb.PutResponse{})
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
Expand Down Expand Up @@ -1629,8 +1628,8 @@ func TestCommitMutatingTransaction(t *testing.T) {
var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) {
t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
Expand Down Expand Up @@ -1692,7 +1691,7 @@ func TestCommitMutatingTransaction(t *testing.T) {
if err := db.Txn(ctx, test.f); err != nil {
t.Fatalf("%d: unexpected error on commit: %s", i, err)
}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, test.expMethod}
expectedCalls := []roachpb.Method{test.expMethod}
if test.pointWrite {
expectedCalls = append(expectedCalls, roachpb.QueryIntent)
}
Expand All @@ -1706,6 +1705,7 @@ func TestCommitMutatingTransaction(t *testing.T) {

// TestTxnInsertBeginTransaction verifies that a begin transaction
// request is inserted just before the first mutating command.
// TODO(nvanbenschoten): Remove in 2.3.
func TestTxnInsertBeginTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1727,9 +1727,12 @@ func TestTxnInsertBeginTransaction(t *testing.T) {
return nil, nil
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
Expand Down Expand Up @@ -1758,6 +1761,7 @@ func TestTxnInsertBeginTransaction(t *testing.T) {

// TestBeginTransactionErrorIndex verifies that the error index is cleared
// when a BeginTransaction command causes an error.
// TODO(nvanbenschoten): Remove in 2.3.
func TestBeginTransactionErrorIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1773,9 +1777,12 @@ func TestBeginTransactionErrorIndex(t *testing.T) {
return nil, pErr
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
Expand Down Expand Up @@ -1885,7 +1892,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender,
)
db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
expCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
expCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}

testutils.RunTrueAndFalse(t, "success", func(t *testing.T, success bool) {
calls = nil
Expand All @@ -1911,8 +1918,8 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
}

// TestTransactionKeyNotChangedInRestart verifies that if the transaction
// already has a key (we're in a restart), the key in the begin transaction
// request is not changed.
// already has a key (we're in a restart), the key in the transaction request is
// not changed.
func TestTransactionKeyNotChangedInRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1933,13 +1940,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
return nil, nil
}

// Attempt 0 should have a BeginTxnRequest, and a PutRequest.
// Attempt 1 should have a PutRequest.
if attempt == 0 {
if _, ok := ba.GetArg(roachpb.BeginTransaction); !ok {
t.Fatalf("failed to find a begin transaction request: %v", ba)
}
}
// Both attempts should have a PutRequest.
if _, ok := ba.GetArg(roachpb.Put); !ok {
t.Fatalf("failed to find a put request: %v", ba)
}
Expand Down Expand Up @@ -2004,9 +2005,6 @@ func TestSequenceNumbers(t *testing.T) {
expSequence, seq, args)
}
}
if expSequence != ba.Txn.Sequence {
t.Errorf("expected header sequence %d; got %d", expSequence, ba.Txn.Sequence)
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
Expand Down Expand Up @@ -2081,10 +2079,9 @@ func TestConcurrentTxnRequests(t *testing.T) {
}

expectedCallCounts := map[roachpb.Method]int{
roachpb.BeginTransaction: 1,
roachpb.Put: 26,
roachpb.QueryIntent: 26,
roachpb.EndTransaction: 1,
roachpb.Put: 26,
roachpb.QueryIntent: 26,
roachpb.EndTransaction: 1,
}
if !reflect.DeepEqual(expectedCallCounts, callCounts) {
t.Errorf("expected %v, got %v", expectedCallCounts, callCounts)
Expand Down Expand Up @@ -2265,8 +2262,8 @@ func TestTxnCoordSenderPipelining(t *testing.T) {
}

require.Equal(t, []roachpb.Method{
roachpb.BeginTransaction, roachpb.Put, roachpb.QueryIntent, roachpb.EndTransaction,
roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction,
roachpb.Put, roachpb.QueryIntent, roachpb.EndTransaction,
roachpb.Put, roachpb.EndTransaction,
}, calls)

for _, action := range []func(ctx context.Context, txn *client.Txn) error{
Expand Down
73 changes: 49 additions & 24 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -50,6 +51,7 @@ type txnHeartbeat struct {
// sends got through `wrapped`, not directly through `gatekeeper`.
gatekeeper lockedSender

st *cluster.Settings
clock *hlc.Clock
heartbeatInterval time.Duration
metrics *TxnMetrics
Expand Down Expand Up @@ -87,16 +89,21 @@ type txnHeartbeat struct {
// successful BeginTxn (in which case we know that there is a txn record)
// but as of May 2018 we don't do that. Note that the server accepts a
// BeginTxn with a higher epoch if a transaction record already exists.
// TODO(nvanbenschoten): Once we stop sending BeginTxn entirely (v2.3)
// we can get rid of this. For now, we keep it to ensure compatibility.
// It can't be collapsed into everWroteIntents because 2.1 nodes expect
// a new BeginTxn request on each epoch (e.g. to detect 1PC txns).
needBeginTxn bool

// everSentBeginTxn is set once a BeginTransactionRequest (out of possibly
// many) was sent to the server. If a BeginTxn was ever sent, then an
// EndTransaction needs to eventually be sent and cannot be elided.
// Note that simply looking at txnEnd == nil to see if a heartbeat loop is
// currently running is not always sufficient for deciding whether an
// EndTransaction can be elided - we want to allow multiple rollback attempts
// to be sent and the first one stops the heartbeat loop.
everSentBeginTxn bool
// everWroteIntents is set once the transaction's first write is sent to
// the server. If a write was ever sent, then an EndTransaction needs to
// eventually be sent and cannot be elided. Note that simply looking at
// txnEnd == nil to see if a heartbeat loop is currently running is not
// always sufficient for deciding whether an EndTransaction can be
// elided - we want to allow multiple rollback attempts to be sent and
// the first one stops the heartbeat loop.
// TODO(nvanbenschoten): Can this be replaced with h.mu.txn.Writing?
everWroteIntents bool
}
}

Expand All @@ -105,6 +112,7 @@ type txnHeartbeat struct {
func (h *txnHeartbeat) init(
mu sync.Locker,
txn *roachpb.Transaction,
st *cluster.Settings,
clock *hlc.Clock,
heartbeatInterval time.Duration,
gatekeeper lockedSender,
Expand All @@ -113,6 +121,7 @@ func (h *txnHeartbeat) init(
asyncAbortCallbackLocked func(context.Context),
) {
h.stopper = stopper
h.st = st
h.clock = clock
h.heartbeatInterval = heartbeatInterval
h.metrics = metrics
Expand Down Expand Up @@ -147,10 +156,11 @@ func (h *txnHeartbeat) SendLocked(
etReq = et.(*roachpb.EndTransactionRequest)
}

addedBeginTxn := false
needBeginTxn := haveTxnWrite && h.mu.needBeginTxn
if needBeginTxn {
h.mu.needBeginTxn = false
h.mu.everSentBeginTxn = true
h.mu.everWroteIntents = true
// From now on, all requests need to be checked against the AbortCache on
// the server side. We also conservatively update the current request,
// although I'm not sure if that's necessary.
Expand All @@ -167,20 +177,25 @@ func (h *txnHeartbeat) SendLocked(
// prepared before we had an anchor.
ba.Txn.Key = anchor
}
// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: h.mu.txn.Key,
},
}

// Inject the new request before the first write position, taking care to
// avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
addedBeginTxn = true

// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: h.mu.txn.Key,
},
}

// Inject the new request before the first write position, taking care to
// avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
}

// Start the heartbeat loop.
// Note that we don't do it for 1PC txns: they only leave intents around on
Expand All @@ -204,7 +219,7 @@ func (h *txnHeartbeat) SendLocked(
var commitTurnedToRollback bool
if haveEndTxn {
// Are we writing now or have we written in the past?
elideEndTxn = !h.mu.everSentBeginTxn
elideEndTxn = !h.mu.everWroteIntents
if elideEndTxn {
ba.Requests = ba.Requests[:lastIndex]
} else if etReq.Commit {
Expand Down Expand Up @@ -234,7 +249,7 @@ func (h *txnHeartbeat) SendLocked(
}

// If we inserted a begin transaction request, remove it here.
if needBeginTxn {
if addedBeginTxn {
if br != nil && br.Responses != nil {
br.Responses = append(br.Responses[:firstWriteIdx], br.Responses[firstWriteIdx+1:]...)
}
Expand Down Expand Up @@ -466,6 +481,16 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
return true
}

// TODO(nvanbenschoten): Figure out what to do here. The case we're
// handling is TransactionAbortedErrors without corresponding
// transaction protos attached. @andreimatei any suggestions?
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
h.mu.txn.Status = roachpb.ABORTED
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
h.abortTxnAsyncLocked(ctx)
return false
}

respTxn = pErr.GetTxn()
} else {
respTxn = br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/txn_interceptor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ func (m *txnMetrics) init(txn *roachpb.Transaction, clock *hlc.Clock, metrics *T
func (m *txnMetrics) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
_, hasBegin := ba.GetArg(roachpb.BeginTransaction)
et, hasEnd := ba.GetArg(roachpb.EndTransaction)
m.onePCCommit = hasBegin && hasEnd && et.(*roachpb.EndTransactionRequest).Commit
m.onePCCommit = ba.IsCompleteTransaction()

if m.txnStartNanos == 0 {
m.txnStartNanos = timeutil.Now().UnixNano()
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ func (tp *txnPipeliner) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Fast-path for 1PC transactions.
_, hasBT := ba.GetArg(roachpb.BeginTransaction)
_, hasET := ba.GetArg(roachpb.EndTransaction)
if hasBT && hasET {
if ba.IsCompleteTransaction() {
return tp.wrapped.SendLocked(ctx, ba)
}

Expand Down
Loading