Skip to content

Commit

Permalink
storage: foundational work for follower reads using replica closed ti…
Browse files Browse the repository at this point in the history
…mestamps

This change lays the ground work for follower reads by adding the
prototype implementation into our main codebase

Most of the logic is disabled by default. It is only exercised during
specific unit tests or when running with a nonzero
COCKROACH_CLOSED_TIMESTAMP_INTERVAL.

The code contains several potential correctness anomalies and makes
no attempt at handling lagging followers gracefully. It should not
be used outside of testing and in fact should not be used at all
(though we may want to write roachtests early).

The [follower reads RFC] and many TODOs in this code hint at the
upcoming changes. Most prominently, known correctness gotchas will be
addressed, but the role of quiescence and coalesced heartbeats will be
untangled from the main proposal, which hopefully can clarify the code
somewhat as well. In the meantime, the commit message below documents
what is implemented here, even though it is subject to change:

Nodes send a closed timestamp with coalesced heartbeats. Receipt of
a heartbeat from a node which is the leaseholder for the range means
a closed timestamp can be trusted to apply to each follower replica
which has committed at or over a min lease applied index, a new value
supplied with coalesced heartbeats.

Nodes keep track of their "min proposal timestamp" (MinPropTS), which
is an HLC timestamp. On every heartbeat, the MinPropTS is persisted
locally to ensure monotonicity on node restart. At startup, a node
reads the last persisted MinPropTS, and forwards the HLC clock to the
MPT timestamp + max safe interval if necessary. Nodes check MinPropTS
on command evaluation; a command's timestamp is forwarded if less than
MinPropTS.

Things get more interesting when a range quiesces. Replicas of quiesced
ranges no longer receive info on coalesced heartbeats.  However, if a
replica is quiesced, we can continue to rely on the most recent
store-wide closed timestamp supplied with coalesced heartbeats, so long
as the liveness epoch (reported with heartbeats) remains stable and no
heartbeats are skipped. This can continue for as long as a range is
quiesced, but requires that the leaseholder notifies all followers on
the first heartbeat after a range is unquiesced.

Note there is no concern that on leaseholder change, the new leaseholder
allows a write at an earlier timestamp than a previously reported closed
timestamp. This is due to the low water timestamp in the timestamp cache
being reset on leaseholder transfer to prevent rewriting history in
general.

Release note: None

[follower reads RFC]: cockroachdb#26362
  • Loading branch information
tbg committed Jun 20, 2018
1 parent 7aff340 commit 2b87391
Show file tree
Hide file tree
Showing 22 changed files with 1,839 additions and 309 deletions.
116 changes: 87 additions & 29 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -62,6 +63,11 @@ var (
metaDistSenderNotLeaseHolderErrCount = metric.Metadata{
Name: "distsender.errors.notleaseholder",
Help: "Number of NotLeaseHolderErrors encountered"}
metaDistSenderFollowerReadEligibleCount = metric.Metadata{
Name: "distsender.followerread.eligible",
Help: "Number of requests sent to nearest replica due to follower read eligibility"}
// TODO(tschottdorf): should track follower-read-mismatches, where we sent to a
// follower but that follower wasn't able to serve it.
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand All @@ -72,22 +78,24 @@ var rangeDescriptorCacheSize = settings.RegisterIntSetting(

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
FollowerReadEligibleCount *metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
return DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaDistSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaDistSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
FollowerReadEligibleCount: metric.NewCounter(metaDistSenderFollowerReadEligibleCount),
}
}

Expand Down Expand Up @@ -126,12 +134,13 @@ type DistSender struct {
// rangeCache caches replica metadata for key ranges.
rangeCache *RangeDescriptorCache
// leaseHolderCache caches range lease holders by range ID.
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
rpcContext *rpc.Context
rpcRetryOptions retry.Options
asyncSenderSem chan struct{}
asyncSenderCount int32
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
rpcContext *rpc.Context
rpcRetryOptions retry.Options
asyncSenderSem chan struct{}
asyncSenderCount int32
followerReadInterval time.Duration
}

var _ client.Sender = &DistSender{}
Expand All @@ -151,7 +160,8 @@ type DistSenderConfig struct {
RPCContext *rpc.Context
RangeDescriptorDB RangeDescriptorDB

TestingKnobs ClientTestingKnobs
TestingKnobs ClientTestingKnobs
FollowerReadInterval time.Duration
}

// NewDistSender returns a batch.Sender instance which connects to the
Expand All @@ -160,10 +170,11 @@ type DistSenderConfig struct {
// defaults will be used.
func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
ds := &DistSender{
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
followerReadInterval: cfg.FollowerReadInterval,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -376,6 +387,38 @@ func (ds *DistSender) getDescriptor(
return desc, returnToken, nil
}

func (ds *DistSender) maybeCanReadFromFollower(ba roachpb.BatchRequest) bool {
if ds.followerReadInterval == 0 {
return false
}

// TODO(tschottdorf): this is simplistic. We need to gracefully handle the
// case of lagging followers, for example by reading from the leaseholder
// whenever a follower read fails. With that in, we may still have to
// track lagging replicas so we avoid them in the first place, though this
// may not matter in practice for a long long time (or ever). A similar
// problem exists when resolving an intent during a follower read; on
// retry the follower may still have the intent around. We should go to
// the leaseholder in that case as well.
//
// There's also a correctness problem here. We treat any read
// as a possible follower read, but follower reads never exhibit uncertainty
// restarts. The following can happen:
// 1. txn1 starts, gets timestamp 100, and does nothing for ~10s.
// 2. txn1 reads via follower read, so leader's timestamp cache does not see the update.
// 3. does something else, who knows, commits.
// 3. txn2 starts at timestamp 99 and writes a value invalidating the read on the lease
// holder.
// So basically we need to check whether the transaction has an empty uncertainty
// window (which is how we know it's a historical read).
estimatedClosedTimestamp := ds.clock.Now().Add(-ds.followerReadInterval.Nanoseconds(), 0)
if ba.GetActiveTimestamp(ds.clock.Now).Less(estimatedClosedTimestamp) {
ds.metrics.FollowerReadEligibleCount.Inc(1)
return true
}
return false
}

// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor,
Expand All @@ -391,12 +434,27 @@ func (ds *DistSender) sendSingleRange(
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
// If this request needs to go to a lease holder and we know who
// that is, possibly move it to the front.
readOnly := ba.IsReadOnly()
if !readOnly || ba.ReadConsistency.RequiresReadLease() {
// We can avoid sending to the leaseholder if the batch qualifies
// for being served by follower reads.
//
// TODO(tschottdorf): important improvements are to be made here:
// 1. if a follower for whatever reason falls behind, the request will
// bounce back here and we'll choose the same follower again until
// it catches back up. This isn't great; we should limit follower
// read attempts to one per batch, or even blacklist replicas that
// fail repeatedly (though that is lower priority).
// 2. we may want to add some jitter here to spread out load among replicas
// with similar latencies (or even actively prefer followers over lease-
// holders).
if !readOnly || !ds.maybeCanReadFromFollower(ba) {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
}
}
}
}
Expand Down
44 changes: 26 additions & 18 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,41 @@ import (

//go:generate go run -tags gen-batch gen_batch.go

// SetActiveTimestamp sets the correct timestamp at which the request
// is to be carried out. For transactional requests, ba.Timestamp must
// be zero initially and it will be set to txn.OrigTimestamp (and
// forwarded to txn.SafeTimestamp if non-zero). For non-transactional
// requests, if no timestamp is specified, nowFn is used to create and
// set one.
func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// GetActiveTimestamp returns the timestamp at which the batch will operate.
// For non-transactional requests, this is `ba.Timestamp` if set and nowFn() otherwise.
//
// For transactional requests, it is the OrigTimestamp forwarded by the RefreshTimestamp.
func (ba *BatchRequest) GetActiveTimestamp(nowFn func() hlc.Timestamp) hlc.Timestamp {
if txn := ba.Txn; txn != nil {
if ba.Timestamp != (hlc.Timestamp{}) {
return errors.New("transactional request must not set batch timestamp")
}

// Always use the original timestamp for reads and writes, even
// though some intents may be written at higher timestamps in the
// event of a WriteTooOldError.
ba.Timestamp = txn.OrigTimestamp
ts := txn.OrigTimestamp
// If a refreshed timestamp is set for the transaction, forward
// the batch timestamp to it. The refreshed timestamp indicates a
// future timestamp at which the transaction would like to commit
// to safely avoid a serializable transaction restart.
ba.Timestamp.Forward(txn.RefreshedTimestamp)
} else {
// When not transactional, allow empty timestamp and use nowFn instead
if ba.Timestamp == (hlc.Timestamp{}) {
ba.Timestamp = nowFn()
}
ts.Forward(txn.RefreshedTimestamp)
return ts
}
// When not transactional, use value from nowFn().
if ba.Timestamp == (hlc.Timestamp{}) {
return nowFn()
}
return ba.Timestamp
}

// SetActiveTimestamp sets ba.Timestamp to the active timestamp (as returned
// by GetActiveTimestamp).
//
// Note that for transactional batches, the method will verify that ba.Timestamp
// is initially unset. In particular, SetActiveTimestamp can be called only once
// in that case.
func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
if ba.Txn != nil && ba.Timestamp != (hlc.Timestamp{}) {
return errors.New("transactional request must not set batch timestamp")
}
ba.Timestamp = ba.GetActiveTimestamp(nowFn)
return nil
}

Expand Down
58 changes: 32 additions & 26 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.registry,
)

// The store config is intialized to the default values here in order
// to pass the follower read interval to the distributed sender. It's
// augmented later in this method before sending to the node to create
// stores.
storeCfg := storage.StoreConfig{}
storeCfg.SetDefaults()

// A custom RetryOptions is created which uses stopper.ShouldQuiesce() as
// the Closer. This prevents infinite retry loops from occurring during
// graceful server shutdown
Expand All @@ -260,12 +267,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
retryOpts.Closer = s.stopper.ShouldQuiesce()
distSenderCfg := kv.DistSenderConfig{
AmbientCtx: s.cfg.AmbientCtx,
Settings: st,
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
TestingKnobs: clientTestingKnobs,
AmbientCtx: s.cfg.AmbientCtx,
Settings: st,
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
TestingKnobs: clientTestingKnobs,
FollowerReadInterval: storeCfg.FollowerReadInterval(),
}
s.distSender = kv.NewDistSender(distSenderCfg, s.gossip)
s.registry.AddMetricStruct(s.distSender.Metrics())
Expand Down Expand Up @@ -397,27 +405,25 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
var execCfg sql.ExecutorConfig

// TODO(bdarnell): make StoreConfig configurable.
storeCfg := storage.StoreConfig{
Settings: st,
AmbientCtx: s.cfg.AmbientCtx,
RaftConfig: s.cfg.RaftConfig,
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
NodeLiveness: s.nodeLiveness,
Transport: s.raftTransport,
RPCContext: s.rpcContext,
ScanInterval: s.cfg.ScanInterval,
ScanMaxIdleTime: s.cfg.ScanMaxIdleTime,
TimestampCachePageSize: s.cfg.TimestampCachePageSize,
HistogramWindowInterval: s.cfg.HistogramWindowInterval(),
StorePool: s.storePool,
SQLExecutor: internalExecutor,
LogRangeEvents: s.cfg.EventLogEnabled,
TimeSeriesDataStore: s.tsDB,
storeCfg.Settings = st
storeCfg.AmbientCtx = s.cfg.AmbientCtx
storeCfg.RaftConfig = s.cfg.RaftConfig
storeCfg.Clock = s.clock
storeCfg.DB = s.db
storeCfg.Gossip = s.gossip
storeCfg.NodeLiveness = s.nodeLiveness
storeCfg.Transport = s.raftTransport
storeCfg.RPCContext = s.rpcContext
storeCfg.ScanInterval = s.cfg.ScanInterval
storeCfg.ScanMaxIdleTime = s.cfg.ScanMaxIdleTime
storeCfg.TimestampCachePageSize = s.cfg.TimestampCachePageSize
storeCfg.HistogramWindowInterval = s.cfg.HistogramWindowInterval()
storeCfg.StorePool = s.storePool
storeCfg.SQLExecutor = internalExecutor
storeCfg.LogRangeEvents = s.cfg.EventLogEnabled
storeCfg.TimeSeriesDataStore = s.tsDB
storeCfg.EnableEpochRangeLeases = true

EnableEpochRangeLeases: true,
}
if storeTestingKnobs := s.cfg.TestingKnobs.Store; storeTestingKnobs != nil {
storeCfg.TestingKnobs = *storeTestingKnobs.(*storage.StoreTestingKnobs)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/storage/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestStoreRangeLease(t *testing.T) {

// Allow leases to expire and send commands to ensure we
// re-acquire, then check types again.
mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
for _, key := range splitKeys {
if _, err := mtc.dbs[0].Inc(context.TODO(), key, 1); err != nil {
t.Fatalf("%s failed to increment: %s", key, err)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {

// Allow leases to expire and send commands to ensure we
// re-acquire, then check types again.
mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand All @@ -134,7 +134,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {
sc.EnableEpochRangeLeases = false
mtc.restartStore(0)

mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand All @@ -151,7 +151,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {
sc.EnableEpochRangeLeases = true
mtc.restartStore(0)

mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand Down
Loading

0 comments on commit 2b87391

Please sign in to comment.