Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: route present-time reads to global_read follower replicas #59571

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ go_library(
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/sql/sem/builtins",
"//pkg/util/hlc",
"//pkg/util/timeutil",
"//pkg/util/uuid",
],
)
Expand All @@ -39,6 +38,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security",
Expand All @@ -47,21 +47,21 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/physicalplan/replicaoracle",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
156 changes: 104 additions & 52 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kvfollowerreadsccl

import (
"context"
"fmt"
"time"

Expand All @@ -27,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand All @@ -49,103 +49,155 @@ var followerReadMultiple = settings.RegisterFloatSetting(
},
)

// getFollowerReadOffset returns the offset duration which should be used to as
// the offset from now to request a follower read. The same value less the clock
// uncertainty, then is used to determine at the kv layer if a query can use a
// follower read.
func getFollowerReadDuration(st *cluster.Settings) time.Duration {
// getFollowerReadLag returns the (negative) offset duration from hlc.Now()
// which should be used to request a follower read. The same value is used to
// determine at the kv layer if a query can use a follower read for ranges with
// the default LAG_BY_CLUSTER_SETTING closed timestamp policy.
func getFollowerReadLag(st *cluster.Settings) time.Duration {
targetMultiple := followerReadMultiple.Get(&st.SV)
targetDuration := closedts.TargetDuration.Get(&st.SV)
closeFraction := closedts.CloseFraction.Get(&st.SV)
return -1 * time.Duration(float64(targetDuration)*
(1+closeFraction*targetMultiple))
}

// getGlobalReadsLead returns the (positive) offset duration from hlc.Now()
// which clients can expect followers of a range with the LEAD_FOR_GLOBAL_READS
// closed timestamp policy to have closed off. This offset is equal to the
// maximum clock offset, allowing present-time (i.e. those not pushed into the
// future) transactions to serve reads from followers.
func getGlobalReadsLead(clock *hlc.Clock) time.Duration {
return clock.MaxOffset()
}

func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads")
}

func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool {
if !kvserver.FollowerReadsEnabled.Get(&st.SV) {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
}

func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) {
if err := checkEnterpriseEnabled(clusterID, st); err != nil {
return 0, err
}
return getFollowerReadDuration(st), nil
// NOTE: we assume that at least some of the ranges being queried use a
// LAG_BY_CLUSTER_SETTING closed timestamp policy. Otherwise, there would
// be no reason to use AS OF SYSTEM TIME follower_read_timestamp().
return getFollowerReadLag(st), nil
}

// batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of
// requests that can be evaluated on a follower replica.
func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
return !ba.IsLocking() && ba.IsAllTransactional()
}

// txnCanPerformFollowerRead determines if the provided transaction can perform
// follower reads.
func txnCanPerformFollowerRead(txn *roachpb.Transaction) bool {
// If the request is transactional and that transaction has acquired any
// locks then that request should not perform follower reads. Doing so could
// allow the request to miss its own writes or observe state that conflicts
// with its locks.
return txn != nil && !txn.IsLocking()
return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional()
}

// canUseFollowerRead determines if a query can be sent to a follower.
func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool {
if !kvserver.FollowerReadsEnabled.Get(&st.SV) {
return false
// closedTimestampLikelySufficient determines if a request with a given required
// frontier timestamp is likely to be below a follower's closed timestamp and
// serviceable as a follower read were the request to be sent to a follower
// replica.
func closedTimestampLikelySufficient(
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
requiredFrontierTS hlc.Timestamp,
) bool {
var offset time.Duration
switch ctPolicy {
case roachpb.LAG_BY_CLUSTER_SETTING:
offset = getFollowerReadLag(st)
case roachpb.LEAD_FOR_GLOBAL_READS:
offset = getGlobalReadsLead(clock)
default:
panic("unknown RangeClosedTimestampPolicy")
}
threshold := (-1 * getFollowerReadDuration(st)) - 1*base.DefaultMaxClockOffset
if timeutil.Since(ts.GoTime()) < threshold {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
expectedClosedTS := clock.Now().Add(offset.Nanoseconds(), 0)
return requiredFrontierTS.LessEq(expectedClosedTS)
}

// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
// TODO(aayush): We should try to bind clusterID to the function here, rather
// than having callers plumb it in every time.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return batchCanBeEvaluatedOnFollower(ba) &&
txnCanPerformFollowerRead(ba.Txn) &&
canUseFollowerRead(clusterID, st, forward(ba.Txn.ReadTimestamp, ba.Txn.GlobalUncertaintyLimit))
func canSendToFollower(
clusterID uuid.UUID,
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
ba roachpb.BatchRequest,
) bool {
return checkFollowerReadsEnabled(clusterID, st) &&
batchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.RequiredFrontier())
}

func forward(ts hlc.Timestamp, to hlc.Timestamp) hlc.Timestamp {
ts.Forward(to)
return ts
}

type oracleFactory struct {
type followerReadOracle struct {
clusterID *base.ClusterIDContainer
st *cluster.Settings
clock *hlc.Clock

binPacking replicaoracle.OracleFactory
closest replicaoracle.OracleFactory
closest replicaoracle.Oracle
binPacking replicaoracle.Oracle
}

func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory {
return &oracleFactory{
func newFollowerReadOracle(cfg replicaoracle.Config) replicaoracle.Oracle {
return &followerReadOracle{
clusterID: &cfg.RPCContext.ClusterID,
st: cfg.Settings,
binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg),
closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg),
clock: cfg.RPCContext.Clock,
closest: replicaoracle.NewOracle(replicaoracle.ClosestChoice, cfg),
binPacking: replicaoracle.NewOracle(replicaoracle.BinPackingChoice, cfg),
}
}

func (f oracleFactory) Oracle(txn *kv.Txn) replicaoracle.Oracle {
if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.ReadTimestamp()) {
return f.closest.Oracle(txn)
func (o *followerReadOracle) ChoosePreferredReplica(
ctx context.Context,
txn *kv.Txn,
desc *roachpb.RangeDescriptor,
leaseholder *roachpb.ReplicaDescriptor,
ctPolicy roachpb.RangeClosedTimestampPolicy,
queryState replicaoracle.QueryState,
) (roachpb.ReplicaDescriptor, error) {
var oracle replicaoracle.Oracle
if o.useClosestOracle(txn, ctPolicy) {
oracle = o.closest
} else {
oracle = o.binPacking
}
return f.binPacking.Oracle(txn)
return oracle.ChoosePreferredReplica(ctx, txn, desc, leaseholder, ctPolicy, queryState)
}

func (o *followerReadOracle) useClosestOracle(
txn *kv.Txn, ctPolicy roachpb.RangeClosedTimestampPolicy,
) bool {
// NOTE: this logic is almost identical to canSendToFollower, except that it
// operates on a *kv.Txn instead of a roachpb.BatchRequest. As a result, the
// function does not check batchCanBeEvaluatedOnFollower. This is because we
// assume that if a request is going to be executed in a distributed DistSQL
// flow (which is why it is consulting a replicaoracle.Oracle), then all of
// the individual BatchRequests that it send will be eligible to be served
// on follower replicas as follower reads.
//
// If we were to get this assumption wrong, the flow might be planned on a
// node with a follower replica, but individual BatchRequests would still be
// sent to the correct replicas once canSendToFollower is checked for each
// BatchRequests in the DistSender. This would hurt performance, but would
// not violate correctness.
return checkFollowerReadsEnabled(o.clusterID.Get(), o.st) &&
txn != nil &&
closedTimestampLikelySufficient(o.st, o.clock, ctPolicy, txn.RequiredFrontier())
}

// followerReadAwareChoice is a leaseholder choosing policy that detects
// followerReadOraclePolicy is a leaseholder choosing policy that detects
// whether a query can be used with a follower read.
var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory)
var followerReadOraclePolicy = replicaoracle.RegisterPolicy(newFollowerReadOracle)

func init() {
sql.ReplicaOraclePolicy = followerReadAwareChoice
sql.ReplicaOraclePolicy = followerReadOraclePolicy
builtins.EvalFollowerReadOffset = evalFollowerReadOffset
kvcoord.CanSendToFollower = canSendToFollower
}
Loading