Skip to content

Commit

Permalink
client: Return TransactionAbortedError on a deadline-exceeded error
Browse files Browse the repository at this point in the history
This is consistent with what Replica.EndTransaction does.
  • Loading branch information
kkaneda committed Jun 22, 2016
1 parent 6d60fb6 commit 1123717
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 12 deletions.
6 changes: 4 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,12 +832,14 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
t.Fatal(err)
}

if err := db.Txn(func(txn *client.Txn) error {
// Use txn.Exec instead of db.Txn to disable auto retry.
txn := client.NewTxn(context.TODO(), *db)
if err := txn.Exec(client.TxnExecOptions{AutoRetry: false, AutoCommit: true}, func(txn *client.Txn, _ *client.TxnExecOptions) error {
// Set deadline to sometime in the past.
txn.UpdateDeadlineMaybe(hlc.Timestamp{WallTime: timeutil.Now().Add(-time.Second).UnixNano()})
_, err := txn.Get("k")
return err
}); !testutils.IsError(err, "read-only txn timestamp violates deadline") {
}); !testutils.IsError(err, "txn aborted") {
t.Fatal(err)
}
}
Expand Down
2 changes: 2 additions & 0 deletions client/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ func TestCommonMethods(t *testing.T) {
key{txnType, "IsFinalized"}: {},
key{txnType, "NewBatch"}: {},
key{txnType, "Exec"}: {},
key{txnType, "GetDeadline"}: {},
key{txnType, "ResetDeadline"}: {},
key{txnType, "Run"}: {},
key{txnType, "SetDebugName"}: {},
key{txnType, "SetIsolation"}: {},
Expand Down
20 changes: 14 additions & 6 deletions client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,16 @@ func (txn *Txn) UpdateDeadlineMaybe(deadline hlc.Timestamp) bool {
return false
}

// ResetDeadline resets the deadline.
func (txn *Txn) ResetDeadline() {
txn.deadline = nil
}

// GetDeadline returns the deadline. For testing.
func (txn *Txn) GetDeadline() *hlc.Timestamp {
return txn.deadline
}

// Rollback sends an EndTransactionRequest with Commit=false.
// The txn's status is set to ABORTED in case of error. txn is
// considered finalized and cannot be used to send any more commands.
Expand Down Expand Up @@ -666,14 +676,12 @@ func (txn *Txn) send(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.

br, pErr := txn.db.send(ba)
if elideEndTxn && pErr == nil {
// Check that read only transactions do not violate their deadline.
// Check that read only transactions do not violate their deadline. This can happen
// when the transaction timestamp is updated by ReadWithinUncertaintyIntervalError
// (see TestTxnDeadline).
if endTxnRequest.Deadline != nil {
if endTxnRequest.Deadline.Less(txn.Proto.Timestamp) {
return nil, roachpb.NewErrorf(
"read-only txn timestamp violates deadline: %s < %s",
endTxnRequest.Deadline,
txn.Proto.Timestamp,
)
return nil, roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(), &txn.Proto)
}
}
// This normally happens on the server and sent back in response
Expand Down
43 changes: 39 additions & 4 deletions sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func (p *planner) getTableLease(qname *parser.QualifiedName) (sqlbase.TableDescr
}
}

// If we didn't find a lease, acquire one.
if lease == nil {
// If we didn't find a lease or the lease is about to expire, acquire one.
if lease == nil || p.removeLeaseIfExpiring(lease) {
var err error
lease, err = p.leaseMgr.AcquireByName(p.txn, dbID, qname.Table())
if err != nil {
Expand Down Expand Up @@ -243,8 +243,8 @@ func (p *planner) getTableLeaseByID(tableID sqlbase.ID) (*sqlbase.TableDescripto
}
}

// If we didn't find a lease, acquire one.
if lease == nil {
// If we didn't find a lease or the lease is about to expire, acquire one.
if lease == nil || p.removeLeaseIfExpiring(lease) {
var err error
lease, err = p.leaseMgr.Acquire(p.txn, tableID, 0)
if err != nil {
Expand All @@ -263,6 +263,41 @@ func (p *planner) getTableLeaseByID(tableID sqlbase.ID) (*sqlbase.TableDescripto
return &lease.TableDescriptor, nil
}

// removeLeaseIfExpiring removes a lease and returns true if it is about to expire.
// The method also resets the transaction deadline.
func (p *planner) removeLeaseIfExpiring(lease *LeaseState) bool {
if lease == nil || lease.hasSomeLifeLeft(p.leaseMgr.clock) {
return false
}

// Remove the lease from p.leases.
idx := -1
for i, l := range p.leases {
if l == lease {
idx = i
break
}
}
if idx == -1 {
log.Warningf("lease (%s) not found", lease)
return false
}
p.leases[idx] = p.leases[len(p.leases)-1]
p.leases[len(p.leases)-1] = nil
p.leases = p.leases[:len(p.leases)-1]

if err := p.leaseMgr.Release(lease); err != nil {
log.Warning(err)
}

// Reset the deadline so that a new deadline will be set after the lease is acquired.
p.txn.ResetDeadline()
for _, l := range p.leases {
p.txn.UpdateDeadlineMaybe(hlc.Timestamp{WallTime: l.Expiration().UnixNano()})
}
return true
}

// getTableNames implements the SchemaAccessor interface.
func (p *planner) getTableNames(dbDesc *sqlbase.DatabaseDescriptor) (parser.QualifiedNames, error) {
prefix := sqlbase.MakeNameMetadataKey(dbDesc.ID, "")
Expand Down
52 changes: 52 additions & 0 deletions sql/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package sql
import (
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/sqlbase"
"github.com/cockroachdb/cockroach/testutils"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/leaktest"
)

Expand Down Expand Up @@ -245,3 +248,52 @@ func TestPrimaryKeyUnspecified(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
}

func TestRemoveLeaseIfExpiring(t *testing.T) {
defer leaktest.AfterTest(t)()

p := planner{}
mc := hlc.NewManualClock(0)
p.leaseMgr = &LeaseManager{LeaseStore: LeaseStore{clock: hlc.NewClock(mc.UnixNano)}}
p.leases = make([]*LeaseState, 0)
txn := client.Txn{}
p.setTxn(&txn)

if p.removeLeaseIfExpiring(nil) {
t.Error("expected false with nil input")
}

// Add a lease to the planner.
d := int64(LeaseDuration)
l1 := &LeaseState{expiration: parser.DTimestamp{Time: time.Unix(0, mc.UnixNano()+d+1)}}
p.leases = append(p.leases, l1)
et := hlc.Timestamp{WallTime: l1.Expiration().UnixNano()}
txn.UpdateDeadlineMaybe(et)

if p.removeLeaseIfExpiring(l1) {
t.Error("expected false wih a non-expiring lease")
}
if !p.txn.GetDeadline().Equal(et) {
t.Errorf("expected deadline %s but got %s", et, p.txn.GetDeadline())
}

// Advance the clock so that l1 will be expired.
mc.Increment(d + 1)

// Add another lease.
l2 := &LeaseState{expiration: parser.DTimestamp{Time: time.Unix(0, mc.UnixNano()+d+1)}}
p.leases = append(p.leases, l2)
if !p.removeLeaseIfExpiring(l1) {
t.Error("expected true with an expiring lease")
}
et = hlc.Timestamp{WallTime: l2.Expiration().UnixNano()}
txn.UpdateDeadlineMaybe(et)

if !(len(p.leases) == 1 && p.leases[0] == l2) {
t.Errorf("expected leases to contain %s but has %s", l2, p.leases)
}

if !p.txn.GetDeadline().Equal(et) {
t.Errorf("expected deadline %s, but got %s", et, p.txn.GetDeadline())
}
}
88 changes: 88 additions & 0 deletions sql/txn_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,91 @@ func TestNonRetryableErrorFromCommit(t *testing.T) {
t.Errorf("expected to hit error, but it didn't happen")
}
}

// Verifies that a read-only transaction that triggers a deadline-exceeded error finishes
// without causing an Executor error. In particular, this test case creates a read-only txn
// that elides EndTransactionRequest and makes sure a deadline-exceeded error causes a
// retryable error.
//
// This test triggers the above scenario by making ReadWithinUncertaintyIntervalError advance
// the clock, so that the transaction timestamp exceeds the deadline of the EndTransactionRequest.
func TestTxnDeadline(t *testing.T) {
defer leaktest.AfterTest(t)()

var cmdFilters CommandFilters
cmdFilters.AppendFilter(checkEndTransactionTrigger, true)

restartDone := false
testKey := []byte("test_key")
testingKnobs := &storage.StoreTestingKnobs{
TestingCommandFilter: cmdFilters.runFilters,
ClockBeforeSend: func(c *hlc.Clock, ba roachpb.BatchRequest) {
if restartDone {
return
}

// Hack to advance the transaction timestamp on a transaction restart.
for _, union := range ba.Requests {
if req, ok := union.GetInner().(*roachpb.ScanRequest); ok {
if bytes.Contains(req.Key, testKey) {
now := c.Now()
now.WallTime += int64(5 * sql.LeaseDuration)
c.Update(now)
break
}
}
}
},
}

ctx := server.MakeTestContext()
ctx.TestingKnobs.Store = testingKnobs
server, sqlDB, _ := setupWithContext(t, &ctx)
defer cleanup(server, sqlDB)

cleanupFilter := cmdFilters.AppendFilter(
func(args storagebase.FilterArgs) *roachpb.Error {
if restartDone {
return nil
}

if req, ok := args.Req.(*roachpb.ScanRequest); ok {
if bytes.Contains(req.Key, testKey) {
restartDone = true
// Return ReadWithinUncertaintyIntervalError to update the transaction timestamp on rery.
txn := args.Hdr.Txn
txn.ResetObservedTimestamps()
now := server.Clock().Now()
txn.UpdateObservedTimestamp(server.Gossip().GetNodeID(), now)
return roachpb.NewErrorWithTxn(roachpb.NewReadWithinUncertaintyIntervalError(now, now), txn)
}
}
return nil
}, false)
defer cleanupFilter()

// Use a large max offset to avoid rejecting a transaction whose timestanp is in
// future (as we will advance the transaction timestamp with ReadWithinUncertaintyIntervalError).
server.Clock().SetMaxOffset(sql.LeaseDuration * 10)

sqlDB.SetMaxOpenConns(1)
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k TEXT PRIMARY KEY, v TEXT);
INSERT INTO t.test (k, v) VALUES ('test_key', 'test_val');
`); err != nil {
t.Fatal(err)
}
// Acquire the lease and enable the auto-retry. The first read attempt will trigger ReadWithinUncertaintyIntervalError
// and advance the transaction timestmap. The second read attempt will succeed, but the (elided) EndTransactionRequest
// hits a deadline-exceeded error.
if _, err := sqlDB.Exec(`
SELECT * from t.test WHERE k = 'test_key';
`); err != nil {
t.Fatal(err)
}

if !restartDone {
t.Errorf("expected restart, but it didn't happen")
}
}
8 changes: 8 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ type StoreTestingKnobs struct {
BadChecksumPanic func([]ReplicaSnapshotDiff)
// Disables the use of one phase commits.
DisableOnePhaseCommits bool
// A hack to manipulate the clock before sending a batch request to a replica.
// TODO(kaneda): This hook is not encouraged to use. Get rid of it once
// we make TestServer take a ManualClock.
ClockBeforeSend func(*hlc.Clock, roachpb.BatchRequest)
}

var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}
Expand Down Expand Up @@ -1724,6 +1728,10 @@ func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (br *roachpb.
return nil, roachpb.NewError(err)
}

if s.ctx.TestingKnobs.ClockBeforeSend != nil {
s.ctx.TestingKnobs.ClockBeforeSend(s.ctx.Clock, ba)
}

if s.Clock().MaxOffset() > 0 {
// Once a command is submitted to raft, all replicas' logical
// clocks will be ratcheted forward to match. If the command
Expand Down

0 comments on commit 1123717

Please sign in to comment.