Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql,kv,followerreadsccl: enable follower reads for historical queries #33478

Merged
merged 1 commit into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ significant than <code>element</code> to zero (or one, for day and month)</p>
<p>Compatible elements: year, quarter, month, week, hour, minute, second,
millisecond, microsecond.</p>
</span></td></tr>
<tr><td><code>experimental_follower_read_timestamp() &rarr; <a href="timestamp.html">timestamptz</a></code></td><td><span class="funcdesc"><p>Returns a timestamp which is very likely to be safe to perform
against a follower replica.</p>
<p>This function is intended to be used with an AS OF SYSTEM TIME clause to perform
historical reads against a time which is recent but sufficiently old for reads
to be performed against the closest replica as opposed to the currently
leaseholder for a given range.</p>
<p>Note that this function requires an enterprise license on a CCL distribution to
return without an error.</p>
</span></td></tr>
<tr><td><code>experimental_strftime(input: <a href="date.html">date</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
</span></td></tr>
<tr><td><code>experimental_strftime(input: <a href="timestamp.html">timestamp</a>, extract_format: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>From <code>input</code>, extracts and formats the time as identified in <code>extract_format</code> using standard <code>strftime</code> notation (though not all formatting is supported).</p>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/followerreadsccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/importccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/roleccl"
Expand Down
130 changes: 130 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

// Package followerreadsccl implements and injects the functionality needed to
// expose follower reads to clients.
package followerreadsccl

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/closedts"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// followerReadMultiple is the multiple of kv.closed_timestmap.target_duration
// which the implementation of the follower read capable replica policy ought
// to use to determine if a request can be used for reading.
// FollowerReadMultiple is a hidden setting.
var followerReadMultiple = settings.RegisterValidatedFloatSetting(
"kv.follower_read.target_multiple",
"if above 1, encourages the distsender to perform a read against the "+
"closest replica if a request is older than kv.closed_timestamp.target_duration"+
" * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty "+
"interval. This value also is used to create follower_timestamp().",
3,
func(v float64) error {
if v < 1 {
return fmt.Errorf("%v is not >= 1", v)
}
return nil
},
)

func init() { followerReadMultiple.Hide() }

// getFollowerReadOffset returns the offset duration which should be used to as
// the offset from now to request a follower read. The same value less the clock
// uncertainty, then is used to determine at the kv layer if a query can use a
// follower read.
func getFollowerReadDuration(st *cluster.Settings) time.Duration {
targetMultiple := followerReadMultiple.Get(&st.SV)
targetDuration := closedts.TargetDuration.Get(&st.SV)
closeFraction := closedts.CloseFraction.Get(&st.SV)
return -1 * time.Duration(float64(targetDuration)*
(1+closeFraction*targetMultiple))
}

func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error {
org := sql.ClusterOrganization.Get(&st.SV)
return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads")
}

func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) {
if err := checkEnterpriseEnabled(clusterID, st); err != nil {
return 0, err
}
return getFollowerReadDuration(st), nil
}

// canUseFollowerRead determines if a query can be sent to a follower
func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timestamp) bool {
if !storage.FollowerReadsEnabled.Get(&st.SV) {
return false
}
threshold := (-1 * getFollowerReadDuration(st)) - base.DefaultMaxClockOffset
if timeutil.Since(ts.GoTime()) < threshold {
return false
}
return checkEnterpriseEnabled(clusterID, st) == nil
}

// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.Txn != nil &&
canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp)
}

type oracleFactory struct {
clusterID *base.ClusterIDContainer
st *cluster.Settings

binPacking replicaoracle.OracleFactory
closest replicaoracle.OracleFactory
}

func newOracleFactory(cfg replicaoracle.Config) replicaoracle.OracleFactory {
return &oracleFactory{
clusterID: &cfg.RPCContext.ClusterID,
st: cfg.Settings,
binPacking: replicaoracle.NewOracleFactory(replicaoracle.BinPackingChoice, cfg),
closest: replicaoracle.NewOracleFactory(replicaoracle.ClosestChoice, cfg),
}
}

func (f oracleFactory) Oracle(txn *client.Txn) replicaoracle.Oracle {
if txn != nil && canUseFollowerRead(f.clusterID.Get(), f.st, txn.OrigTimestamp()) {
return f.closest.Oracle(txn)
}
return f.binPacking.Oracle(txn)
}

// followerReadAwareChoice is a leaseholder choosing policy that detects
// whether a query can be used with a follower read.
var followerReadAwareChoice = replicaoracle.RegisterPolicy(newOracleFactory)

func init() {
sql.ReplicaOraclePolicy = followerReadAwareChoice
builtins.EvalFollowerReadOffset = evalFollowerReadOffset
kv.CanSendToFollower = canSendToFollower
}
158 changes: 158 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package followerreadsccl

import (
"context"
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const expectedFollowerReadOffset = -1 * (30 * (1 + .2*3)) * time.Second

func TestEvalFollowerReadOffset(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
if offset, err := evalFollowerReadOffset(uuid.MakeV4(), st); err != nil {
t.Fatal(err)
} else if offset != expectedFollowerReadOffset {
t.Fatalf("expected %v, got %v", expectedFollowerReadOffset, offset)
}
disableEnterprise()
_, err := evalFollowerReadOffset(uuid.MakeV4(), st)
if !testutils.IsError(err, "requires an enterprise license") {
t.Fatalf("failed to get error when evaluating follower read offset without " +
"an enterprise license")
}
}

func TestCanSendToFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
storage.FollowerReadsEnabled.Override(&st.SV, true)
old := hlc.Timestamp{
WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(),
}
oldHeader := roachpb.Header{Txn: &roachpb.Transaction{
OrigTimestamp: old,
}}
rw := roachpb.BatchRequest{Header: oldHeader}
rw.Add(&roachpb.PutRequest{})
if canSendToFollower(uuid.MakeV4(), st, rw) {
t.Fatalf("should not be able to send a rw request to a follower")
}
roNoTxn := roachpb.BatchRequest{}
roNoTxn.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roNoTxn) {
t.Fatalf("should not be able to send a batch with no txn to a follower")
}
roOld := roachpb.BatchRequest{Header: oldHeader}
roOld.Add(&roachpb.GetRequest{})
if !canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should be able to send an old ro batch to a follower")
}
storage.FollowerReadsEnabled.Override(&st.SV, false)
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled")
}
storage.FollowerReadsEnabled.Override(&st.SV, true)
roNew := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
OrigTimestamp: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
},
}}
if canSendToFollower(uuid.MakeV4(), st, roNew) {
t.Fatalf("should not be able to send a new ro batch to a follower")
}
disableEnterprise()
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower without enterprise enabled")
}
}

func TestFollowerReadMultipleValidation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer func() {
if r := recover(); r == nil {
t.Fatalf("expected panic from setting followerReadMultiple to .1")
}
}()
st := cluster.MakeTestingClusterSettings()
followerReadMultiple.Override(&st.SV, .1)
}

// TestOracle tests the OracleFactory exposed by this package.
// This test ends up being rather indirect but works by checking if the type
// of the oracle returned from the factory differs between requests we'd
// expect to support follower reads and that which we'd expect not to.
func TestOracleFactory(t *testing.T) {
defer leaktest.AfterTest(t)()
disableEnterprise := utilccl.TestingEnableEnterprise()
defer disableEnterprise()
st := cluster.MakeTestingClusterSettings()
storage.FollowerReadsEnabled.Override(&st.SV, true)
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
aCtx := log.AmbientContext{Tracer: tracing.NewTracer()}
rpcContext := rpc.NewContext(
aCtx,
&base.Config{Insecure: true},
clock,
stopper,
&st.Version,
)
c := client.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, client.MockTxnSenderFactory{},
hlc.NewClock(hlc.UnixNano, time.Nanosecond))
txn := client.NewTxn(context.TODO(), c, 0, client.RootTxn)
of := replicaoracle.NewOracleFactory(followerReadAwareChoice, replicaoracle.Config{
Settings: st,
RPCContext: rpcContext,
})
noFollowerReadOracle := of.Oracle(txn)
old := hlc.Timestamp{
WallTime: timeutil.Now().Add(2 * expectedFollowerReadOffset).UnixNano(),
}
txn.SetFixedTimestamp(context.TODO(), old)
followerReadOracle := of.Oracle(txn)
if reflect.TypeOf(followerReadOracle) == reflect.TypeOf(noFollowerReadOracle) {
t.Fatalf("expected types of %T and %T to differ", followerReadOracle,
noFollowerReadOracle)
}
disableEnterprise()
disabledFollowerReadOracle := of.Oracle(txn)
if reflect.TypeOf(disabledFollowerReadOracle) != reflect.TypeOf(noFollowerReadOracle) {
t.Fatalf("expected types of %T and %T not to differ", disabledFollowerReadOracle,
noFollowerReadOracle)
}
}
Loading