Skip to content

Commit

Permalink
storage: serve reads based on closed timestamps
Browse files Browse the repository at this point in the history
This "finishes" hooking up the storage side of closed timestamps. After
checking their lease and deciding that the lease does not allow serving
a read, replicas now check whether they can serve the batch as a
follower read. This requires that the range is epoch based, that
appropriate information is stored in the closed timestamp subsystem, and
finally that the cluster setting to enable this is set.

Added a test that verifies that a test server will serve follower reads
(directly from the replicas, without routing through DistSender).

Introducing machinery at the distributed sender to actually consider
routing reads to follower replicas is the next step.

TODO: take perf numbers before/after this change to verify that there
isn't a noticeable regression.

Touches cockroachdb#16593.

Release note: None
  • Loading branch information
tbg committed Aug 13, 2018
1 parent 470173b commit 8ccd156
Show file tree
Hide file tree
Showing 14 changed files with 429 additions and 54 deletions.
6 changes: 1 addition & 5 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ func createTestNode(
cfg.Settings,
cfg.HistogramWindowInterval,
)
cfg.ClosedTimestamp = container.NewContainer(container.Config{
Settings: st,
Stopper: stopper,
Clock: cfg.NodeLiveness.AsLiveClock(),
})
cfg.ClosedTimestamp = container.NoopContainer()

storage.TimeUntilStoreDead.Override(&cfg.Settings.SV, 10*time.Millisecond)
cfg.StorePool = storage.NewStorePool(
Expand Down
12 changes: 12 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
Settings: st,
Stopper: s.stopper,
Clock: s.nodeLiveness.AsLiveClock(),
// NB: s.node is not defined at this point, but it will be
// before this is ever called.
Refresh: func(rangeIDs ...roachpb.RangeID) {
for _, rangeID := range rangeIDs {
repl, err := s.node.stores.GetReplicaForRangeID(rangeID)
if err != nil || repl == nil {
continue
}
repl.EmitMLAI()
}
},
Dialer: s.nodeDialer.CTDialer(),
}),

EnableEpochRangeLeases: true,
Expand Down
197 changes: 197 additions & 0 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage_test

import (
"context"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func TestClosedTimestampCanServe(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
const numNodes = 3

tc := serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db0 := tc.ServerConn(0)
// Every 0.01s=10ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
if _, err := db0.Exec(`
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '300ms';
SET CLUSTER SETTING kv.closed_timestamp.close_fraction = 0.01/0.3;
SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
CREATE DATABASE cttest;
CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING);
`); err != nil {
t.Fatal(err)
}

var rangeID roachpb.RangeID
var startKey roachpb.Key
var numReplicas int
testutils.SucceedsSoon(t, func() error {
if err := db0.QueryRow(
`SELECT range_id, start_key, array_length(replicas, 1) FROM crdb_internal.ranges WHERE "table" = 'kv' AND "database" = 'cttest'`,
).Scan(&rangeID, &startKey, &numReplicas); err != nil {
return err
}
if numReplicas != 3 {
return errors.New("not fully replicated yet")
}
return nil
})

desc, err := tc.LookupRange(startKey)
if err != nil {
t.Fatal(err)
}

// First, we perform an arbitrary lease transfer because that will turn the
// lease into an epoch based one (the initial lease is likely expiration based
// since the range just split off from the very first range which is expiration
// based).
var lh roachpb.ReplicationTarget
testutils.SucceedsSoon(t, func() error {
var err error
lh, err = tc.FindRangeLeaseHolder(desc, nil)
return err
})

for i := 0; i < numNodes; i++ {
target := tc.Target(i)
if target != lh {
if err := tc.TransferRangeLease(desc, target); err != nil {
t.Fatal(err)
}
break
}
}

var repls []*storage.Replica
testutils.SucceedsSoon(t, func() error {
repls = nil
for i := 0; i < numNodes; i++ {
repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID)
if err != nil {
return err
}
if repl != nil {
repls = append(repls, repl)
}
}
return nil
})

require.Equal(t, numReplicas, len(repls))

// Wait until we see an epoch based lease on our chosen range. This should
// happen fairly quickly since we just transferred a lease (as a means to make
// it epoch based). If the lease transfer fails, we'll be sitting out the lease
// expiration, which is on the order of seconds. Not great, but good enough since
// the transfer basically always works.
for ok := false; !ok; time.Sleep(10 * time.Millisecond) {
for _, repl := range repls {
lease, _ := repl.GetLease()
if lease.Epoch != 0 {
ok = true
break
}
}
}

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

var baRead roachpb.BatchRequest
baRead.Header.RangeID = desc.RangeID
r := &roachpb.ScanRequest{}
r.Key = desc.StartKey.AsRawKey()
r.EndKey = desc.EndKey.AsRawKey()
baRead.Add(r)
baRead.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

// The read should succeed once enough time (~300ms, but it's difficult to
// assert on that) has passed - on all replicas!
testutils.SucceedsSoon(t, func() error {
for _, repl := range repls {
resp, pErr := repl.Send(ctx, baRead)
if pErr != nil {
switch tErr := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError:
return tErr
case *roachpb.RangeNotFoundError:
// Can happen during upreplication.
return tErr
default:
t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl))
}
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != 1 {
t.Fatalf("expected one row, but got %d", len(rows))
}
}
return nil
})

// We just served a follower read. As a sanity check, make sure that we can't write at
// that same timestamp.
{
var baWrite roachpb.BatchRequest
r := &roachpb.DeleteRequest{}
r.Key = desc.StartKey.AsRawKey()
txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, enginepb.SERIALIZABLE, baRead.Timestamp, 100)
baWrite.Txn = &txn
baWrite.Add(r)

var found bool
for _, repl := range repls {
resp, pErr := repl.Send(ctx, baWrite)
if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok {
continue
} else if pErr != nil {
t.Fatal(pErr)
}
found = true
if !baRead.Timestamp.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp {
t.Fatal("timestamp did not get bumped")
}
break
}
if !found {
t.Fatal("unable to send to any replica")
}
}
}
23 changes: 15 additions & 8 deletions pkg/storage/closedts/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ type Config struct {
type Container struct {
Config
// Initialized on Start().
Tracker closedts.TrackerI
Storage closedts.Storage
Provider closedts.Provider
Server ctpb.Server
Clients closedts.ClientRegistry
Tracker closedts.TrackerI
Storage closedts.Storage
Provider closedts.Provider
Server ctpb.Server
Clients closedts.ClientRegistry

nodeID roachpb.NodeID
delayedServer *delayedServer
noop bool // if true, is NoopContainer
}

const (
Expand Down Expand Up @@ -85,7 +87,7 @@ func NewContainer(cfg Config) *Container {

type delayedServer struct {
active int32 // atomic
s ctpb.ClosedTimestampServer
s ctpb.Server
}

func (s *delayedServer) Start() {
Expand All @@ -96,13 +98,13 @@ func (s delayedServer) Get(client ctpb.ClosedTimestamp_GetServer) error {
if atomic.LoadInt32(&s.active) == 0 {
return errors.New("not available yet")
}
return s.Get(client)
return s.s.Get(client)
}

// RegisterClosedTimestampServer registers the Server contained in the container
// with gRPC.
func (c *Container) RegisterClosedTimestampServer(s *grpc.Server) {
c.delayedServer = &delayedServer{s: ctpb.ServerShim{Server: c.Server}}
c.delayedServer = &delayedServer{}
ctpb.RegisterClosedTimestampServer(s, c.delayedServer)
}

Expand All @@ -111,6 +113,10 @@ func (c *Container) RegisterClosedTimestampServer(s *grpc.Server) {
func (c *Container) Start(nodeID roachpb.NodeID) {
cfg := c.Config

if c.noop {
return
}

storage := storage.NewMultiStorage(func() storage.SingleStorage {
return storage.NewMemStorage(StorageBucketScale, storageBucketNum)
})
Expand Down Expand Up @@ -145,6 +151,7 @@ func (c *Container) Start(nodeID roachpb.NodeID) {
c.Provider = provider
c.Provider.Start()
if c.delayedServer != nil {
c.delayedServer.s = server
c.delayedServer.Start()
}
}
1 change: 1 addition & 0 deletions pkg/storage/closedts/container/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NoopContainer() *Container {
Provider: noopEverything{},
Server: noopEverything{},
Clients: noopEverything{},
noop: true,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/closedts/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,10 @@ func (p *Provider) Subscribe(ctx context.Context, ch chan<- ctpb.Entry) {
var i int
sub := &subscriber{ch, nil}
p.mu.Lock()
for i = range p.mu.subscribers {
for i = 0; i < len(p.mu.subscribers); i++ {
if p.mu.subscribers[i] == nil {
p.mu.subscribers[i] = sub
break
}
}
if i == len(p.mu.subscribers) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/storage/closedts/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package provider_test
import (
"context"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -235,3 +236,57 @@ func TestProviderSubscribeNotify(t *testing.T) {
return nil
})
}

// TestProviderSubscribeConcurrent prevents regression of a bug that improperly
// handled concurrent subscriptions.
func TestProviderSubscribeConcurrent(t *testing.T) {
defer leaktest.AfterTest(t)()

st := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(&st.SV, time.Millisecond)
closedts.CloseFraction.Override(&st.SV, 1.0)

stopper := stop.NewStopper()
storage := &providertestutils.TestStorage{}

var ts int64 // atomic
cfg := &provider.Config{
NodeID: 1,
Settings: st,
Stopper: stopper,
Storage: storage,
Clock: func(roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) {
return hlc.Timestamp{}, 1, nil
},
Close: func(next hlc.Timestamp) (hlc.Timestamp, map[roachpb.RangeID]ctpb.LAI) {
return hlc.Timestamp{
WallTime: atomic.AddInt64(&ts, 1),
}, map[roachpb.RangeID]ctpb.LAI{
1: ctpb.LAI(atomic.LoadInt64(&ts)),
}
},
}

p := provider.NewProvider(cfg)
p.Start()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
cancel = func() {}
const n = 10
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
ch := make(chan ctpb.Entry, 3)
p.Subscribe(ctx, ch)
// Read from channel until stopper stops Provider (and in turn Provider
// closes channel).
for range ch {
}
}()
}
stopper.Stop(context.Background())
wg.Wait()
}
Loading

0 comments on commit 8ccd156

Please sign in to comment.