Skip to content

Commit

Permalink
kv: disallow follower reads for writing transactions
Browse files Browse the repository at this point in the history
Fixes cockroachdb#35812.

To avoid missing its own writes, a transaction must not evaluate a read
on a follower who has nit caught up to at least its current provisional
commit timestamp. We were violating this both at the DistSender level and
at the Replica level.

Because the ability to perform follower reads in a writing transaction is
fairly unimportant and has these known issues, this commit disallows
follower reads for writing transactions.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 19, 2019
1 parent 4aba654 commit 733561f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest
// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.Txn != nil &&
return ba.IsReadOnly() && ba.Txn != nil && !ba.Txn.IsWriting() &&
canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -79,6 +80,16 @@ func TestCanSendToFollower(t *testing.T) {
if !canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should be able to send an old ro batch to a follower")
}
roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Key: []byte("key")},
OrigTimestamp: old,
},
}}
roRWTxnOld.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) {
t.Fatalf("should not be able to send a ro request from a rw txn to a follower")
}
storage.FollowerReadsEnabled.Override(&st.SV, false)
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled")
Expand Down
57 changes: 57 additions & 0 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -265,6 +266,62 @@ func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err
return
}

func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
t.Skip("skipping under race")
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp. Wait if necessary.
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1))
})

// Create a read-only batch and attach a read-write transaction.
rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0)
baRead := makeReadBatchRequestForDesc(desc, ts)
baRead.Txn = &rwTxn

// Send the request to all three replicas. One should succeed and
// the other two should return NotLeaseHolderErrors.
g, ctx := errgroup.WithContext(ctx)
var notLeaseholderErrs int64
for i := range repls {
repl := repls[i]
func(r *storage.Replica) {
g.Go(func() (err error) {
if _, pErr := repl.Send(ctx, baRead); pErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
atomic.AddInt64(&notLeaseholderErrs, 1)
return nil
}
return pErr.GetDetail()
}
return nil
})
}(repls[i])
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
if a, e := notLeaseholderErrs, int64(2); a != e {
t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a)
}
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func (r *Replica) canServeFollowerRead(
) *roachpb.Error {
canServeFollowerRead := false
if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch &&
FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch {
(ba.Txn == nil || !ba.Txn.IsWriting()) {

canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp)
if !canServeFollowerRead {
Expand Down

0 comments on commit 733561f

Please sign in to comment.