Skip to content

Commit

Permalink
Configurable low res tso update interval (tikv#1154) (tikv#1155)
Browse files Browse the repository at this point in the history
* Configurable lowres tso update interval

Signed-off-by: Ari Ekmekji <[email protected]>
Co-authored-by: Ari Ekmekji <[email protected]>
  • Loading branch information
ari-e and Ari Ekmekji committed May 4, 2024
1 parent 98ed21b commit 4d4dcd3
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 14 deletions.
1 change: 1 addition & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Oracle interface {
GetTimestampAsync(ctx context.Context, opt *Option) Future
GetLowResolutionTimestamp(ctx context.Context, opt *Option) (uint64, error)
GetLowResolutionTimestampAsync(ctx context.Context, opt *Option) Future
SetLowResolutionTimestampUpdateInterval(time.Duration) error
GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (uint64, error)
IsExpired(lockTimestamp, TTL uint64, opt *Option) bool
UntilExpired(lockTimeStamp, TTL uint64, opt *Option) int64
Expand Down
22 changes: 22 additions & 0 deletions oracle/oracles/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
package oracles

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
)

// SetOracleHookCurrentTime exports localOracle's time hook to test.
Expand All @@ -59,6 +62,25 @@ func NewEmptyPDOracle() oracle.Oracle {
return &pdOracle{}
}

func NewPdOracleWithClient(client pd.Client) oracle.Oracle {
return &pdOracle{
c: client,
}
}

func StartTsUpdateLoop(o oracle.Oracle, ctx context.Context, wg *sync.WaitGroup) {
pd, ok := o.(*pdOracle)
if !ok {
panic("expected pdOracle")
}
pd.quit = make(chan struct{})
wg.Add(1)
go func() {
pd.updateTS(ctx)
wg.Done()
}()
}

// SetEmptyPDOracleLastTs exports PD oracle's global last ts to test.
func SetEmptyPDOracleLastTs(oc oracle.Oracle, ts uint64) {
switch o := oc.(type) {
Expand Down
4 changes: 4 additions & 0 deletions oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *o
return l.GetTimestampAsync(ctx, opt)
}

func (l *localOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) error {
return nil
}

// GetStaleTimestamp return physical
func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
Expand Down
4 changes: 4 additions & 0 deletions oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *or
return o.GetTimestampAsync(ctx, opt)
}

func (o *MockOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) error {
return nil
}

// IsExpired implements oracle.Oracle interface.
func (o *MockOracle) IsExpired(lockTimestamp, TTL uint64, _ *oracle.Option) bool {
o.RLock()
Expand Down
43 changes: 33 additions & 10 deletions oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package oracles

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
Expand All @@ -59,24 +60,30 @@ type pdOracle struct {
// txn_scope (string) -> lastTSPointer (*uint64)
lastTSMap sync.Map
// txn_scope (string) -> lastArrivalTSPointer (*uint64)
lastArrivalTSMap sync.Map
quit chan struct{}
lastArrivalTSMap sync.Map
quit chan struct{}
lastTSUpdateInterval atomic.Int64
}

// NewPdOracle create an Oracle that uses a pd client source.
// Refer https://github.com/tikv/pd/blob/master/client/client.go for more details.
// PdOracle mantains `lastTS` to store the last timestamp got from PD server. If
// `GetTimestamp()` is not called after `updateInterval`, it will be called by
// PdOracle maintains `lastTS` to store the last timestamp got from PD server. If
// `GetTimestamp()` is not called after `lastTSUpdateInterval`, it will be called by
// itself to keep up with the timestamp on PD server.
func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) {
o := &pdOracle{
c: pdClient,
quit: make(chan struct{}),
c: pdClient,
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
}
err := o.SetLowResolutionTimestampUpdateInterval(updateInterval)
if err != nil {
return nil, err
}
ctx := context.TODO()
go o.updateTS(ctx, updateInterval)
go o.updateTS(ctx)
// Initialize the timestamp of the global txnScope by Get.
_, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
_, err = o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
o.Close()
return nil, err
Expand Down Expand Up @@ -222,8 +229,9 @@ func (o *pdOracle) getLastArrivalTS(txnScope string) (uint64, bool) {
return atomic.LoadUint64(lastArrivalTSInterface.(*uint64)), true
}

func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
func (o *pdOracle) updateTS(ctx context.Context) {
currentInterval := o.lastTSUpdateInterval.Load()
ticker := time.NewTicker(time.Duration(currentInterval))
defer ticker.Stop()
for {
select {
Expand All @@ -239,6 +247,11 @@ func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) {
o.setLastTS(ts, txnScope)
return true
})
newInterval := o.lastTSUpdateInterval.Load()
if newInterval != currentInterval {
currentInterval = newInterval
ticker.Reset(time.Duration(currentInterval))
}
case <-o.quit:
return
}
Expand Down Expand Up @@ -269,6 +282,16 @@ func (f lowResolutionTsFuture) Wait() (uint64, error) {
return f.ts, f.err
}

// SetLowResolutionTimestampUpdateInterval sets the refresh interval for low resolution timestamps. Note this will take
// effect up to the previous update interval amount of time after being called.
func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(updateInterval time.Duration) error {
if updateInterval <= 0 {
return fmt.Errorf("updateInterval must be > 0")
}
o.lastTSUpdateInterval.Store(updateInterval.Nanoseconds())
return nil
}

// GetLowResolutionTimestamp gets a new increasing time.
func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) {
lastTS, ok := o.getLastTS(opt.TxnScope)
Expand Down
76 changes: 76 additions & 0 deletions oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ package oracles_test
import (
"context"
"math"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/oracle/oracles"
pd "github.com/tikv/pd/client"
)

func TestPDOracle_UntilExpired(t *testing.T) {
Expand Down Expand Up @@ -72,3 +75,76 @@ func TestPdOracle_GetStaleTimestamp(t *testing.T) {
assert.NotNil(t, err)
assert.Regexp(t, ".*invalid prevSecond.*", err.Error())
}

// A mock for pd.Client that only returns global transaction scoped
// timestamps at the same physical time with increasing logical time
type MockPdClient struct {
pd.Client

logicalTimestamp atomic.Int64
}

func (c *MockPdClient) GetTS(ctx context.Context) (int64, int64, error) {
return 0, c.logicalTimestamp.Add(1), nil
}

func TestPdOracle_SetLowResolutionTimestampUpdateInterval(t *testing.T) {
pdClient := MockPdClient{}
o := oracles.NewPdOracleWithClient(&pdClient)
ctx := context.TODO()
wg := sync.WaitGroup{}

err := o.SetLowResolutionTimestampUpdateInterval(50 * time.Millisecond)
assert.Nil(t, err)

// First call to o.GetTimestamp just seeds the timestamp
_, err = o.GetTimestamp(ctx, &oracle.Option{})
assert.Nil(t, err)

// Haven't started update loop yet so next call to GetTs should be 1
// while the low resolution timestamp stays at 0
lowRes, err := o.GetLowResolutionTimestamp(ctx, &oracle.Option{})
assert.Nil(t, err)
ts, err := o.GetTimestamp(ctx, &oracle.Option{})
assert.Nil(t, err)
assert.Greater(t, ts, lowRes)

waitForTimestampToChange := func(checkFrequency time.Duration) {
currTs, err := o.GetLowResolutionTimestamp(ctx, &oracle.Option{})
assert.Nil(t, err)
assert.Eventually(t, func() bool {
nextTs, err := o.GetLowResolutionTimestamp(ctx, &oracle.Option{})
assert.Nil(t, err)
return nextTs > currTs
}, 5*time.Second, checkFrequency)
}

// Time based unit tests are inherently flaky. To reduce that
// this just asserts a loose lower and upper bound that should
// not be affected by timing inconsistencies across platforms
checkBounds := func(updateInterval time.Duration) {
start := time.Now()
waitForTimestampToChange(10 * time.Millisecond)
waitForTimestampToChange(10 * time.Millisecond)
elapsed := time.Since(start)
assert.Greater(t, elapsed, updateInterval)
assert.LessOrEqual(t, elapsed, 3*updateInterval)
}

oracles.StartTsUpdateLoop(o, ctx, &wg)
// Check each update interval. Note that since these are in increasing
// order the time for the new interval to take effect is always less
// than the new interval. If we iterated in opposite order, then we'd have
// to first wait for the timestamp to change before checking bounds.
for _, updateInterval := range []time.Duration{
50 * time.Millisecond,
150 * time.Millisecond,
500 * time.Millisecond} {
err = o.SetLowResolutionTimestampUpdateInterval(updateInterval)
assert.Nil(t, err)
checkBounds(updateInterval)
}

o.Close()
wg.Wait()
}
19 changes: 16 additions & 3 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, erro
return cli, nil
}

// update oracle's lastTS every 2000ms.
var oracleUpdateInterval = 2000
// Update oracle's lastTS every 2000ms by default. Can override at startup with an option to NewKVStore
// or at runtime by calling SetLowResolutionTimestampUpdateInterval on the oracle
var defaultOracleUpdateInterval = 2 * time.Second

// KVStore contains methods to interact with a TiKV cluster.
type KVStore struct {
Expand Down Expand Up @@ -194,6 +195,18 @@ func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
}
}

// WithUpdateInterval sets the frequency with which to refresh read timestamps
// from the PD client. Smaller updateInterval will lead to more HTTP calls to
// PD and less staleness on reads, and vice versa.
func WithUpdateInterval(updateInterval time.Duration) Option {
return func(o *KVStore) {
err := o.oracle.SetLowResolutionTimestampUpdateInterval(updateInterval)
if err != nil {
panic(err)
}
}
}

// loadOption load KVStore option into KVStore.
func loadOption(store *KVStore, opt ...Option) {
for _, f := range opt {
Expand All @@ -203,7 +216,7 @@ func loadOption(store *KVStore, opt ...Option) {

// NewKVStore creates a new TiKV store instance.
func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) {
o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond)
o, err := oracles.NewPdOracle(pdClient, defaultOracleUpdateInterval)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package tikv
import (
"bytes"
"context"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/retry"
Expand Down Expand Up @@ -240,5 +241,5 @@ func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) {

// SetOracleUpdateInterval sets the interval of updating cached ts.
func (c ConfigProbe) SetOracleUpdateInterval(v int) {
oracleUpdateInterval = v
defaultOracleUpdateInterval = time.Duration(v) * time.Millisecond
}

0 comments on commit 4d4dcd3

Please sign in to comment.