Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient: support resolve lock for ticdc #459

Merged
merged 9 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 121 additions & 14 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package kv

import (
"bytes"
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"io"
"math/rand"
"strconv"
Expand All @@ -26,6 +26,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
Expand All @@ -34,6 +35,9 @@ import (
"github.com/pingcap/ticdc/pkg/util"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -233,16 +237,18 @@ type CDCClient struct {
}

regionCache *tikv.RegionCache
kvStorage tikv.Storage
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(pd pd.Client) (c *CDCClient, err error) {
func NewCDCClient(pd pd.Client, kvStorage tikv.Storage) (c *CDCClient, err error) {
clusterID := pd.GetClusterID(context.Background())
log.Info("get clusterID", zap.Uint64("id", clusterID))

c = &CDCClient{
clusterID: clusterID,
pd: pd,
kvStorage: kvStorage,
regionCache: tikv.NewRegionCache(pd),
mu: struct {
sync.Mutex
Expand Down Expand Up @@ -306,7 +312,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string) (stream cdcpb.Ch
func (c *CDCClient) EventFeed(
ctx context.Context, span util.Span, ts uint64, eventCh chan<- *model.RegionFeedEvent,
) error {
s := newEventFeedSession(c, c.regionCache, span, eventCh)
s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, eventCh)
return s.eventFeed(ctx, ts)
}

Expand All @@ -319,6 +325,7 @@ func allocID() uint64 {
type eventFeedSession struct {
client *CDCClient
regionCache *tikv.RegionCache
kvStorage tikv.Storage

// The whole range that is being subscribed.
totalSpan util.Span
Expand Down Expand Up @@ -350,13 +357,15 @@ type rangeRequestTask struct {
func newEventFeedSession(
client *CDCClient,
regionCache *tikv.RegionCache,
kvStorage tikv.Storage,
totalSpan util.Span,
eventCh chan<- *model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
return &eventFeedSession{
client: client,
regionCache: regionCache,
kvStorage: kvStorage,
totalSpan: totalSpan,
eventCh: eventCh,
regionCh: make(chan singleRegionInfo, 16),
Expand Down Expand Up @@ -1013,27 +1022,42 @@ func (s *eventFeedSession) singleEventFeed(
var initialized uint32

matcher := newMatcher()

var ticker *time.Ticker
hangTime := time.Duration(0)
advanceCheckTicker := time.NewTicker(time.Second * 5)
lastReceivedEventTime := time.Now()
var lastResolvedTs uint64

for {
ticker = time.NewTicker(time.Minute)
var event *cdcpb.Event
var ok bool
select {
case <-ctx.Done():
return atomic.LoadUint64(&checkpointTs), ctx.Err()
case <-ticker.C:
hangTime += time.Minute
log.Warn("region not receiving event from tikv for too long time",
zap.Uint64("regionID", regionID), zap.Reflect("span", span), zap.Duration("duration", hangTime))
case <-advanceCheckTicker.C:
sinceLastEvent := time.Since(lastReceivedEventTime)
if sinceLastEvent > time.Second*20 {
log.Warn("region not receiving event from tikv for too long time",
zap.Uint64("regionID", regionID), zap.Reflect("span", span), zap.Duration("duration", sinceLastEvent))
}
version, err := s.kvStorage.CurrentVersion()
if err != nil {
log.Warn("failed to get current version from PD", zap.Error(err))
continue
}
currentTimeFromPD := oracle.GetTimeFromTS(version.Ver)
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(lastResolvedTs))
if sinceLastResolvedTs > time.Second*20 {
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", regionID), zap.Reflect("span", span), zap.Duration("duration", sinceLastResolvedTs), zap.Uint64("lastResolvedTs", lastResolvedTs))
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
err = s.resolveLock(ctx, regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock", zap.Uint64("regionID", regionID), zap.Error(err))
continue
}
}
continue
case event, ok = <-receiverCh:
}
hangTime = 0
ticker.Stop()

if !ok {
log.Debug("singleEventFeed receiver closed")
return atomic.LoadUint64(&checkpointTs), nil
Expand All @@ -1043,6 +1067,7 @@ func (s *eventFeedSession) singleEventFeed(
log.Debug("singleEventFeed closed by error")
return atomic.LoadUint64(&checkpointTs), errors.New("single event feed aborted")
}
lastReceivedEventTime = time.Now()

metricEventSize.Observe(float64(event.Event.Size()))
switch x := event.Event.(type) {
Expand Down Expand Up @@ -1139,6 +1164,7 @@ func (s *eventFeedSession) singleEventFeed(
case *cdcpb.Event_Error:
return atomic.LoadUint64(&checkpointTs), errors.Trace(&eventError{err: x.Error})
case *cdcpb.Event_ResolvedTs:
lastResolvedTs = x.ResolvedTs
if atomic.LoadUint32(&initialized) == 0 {
continue
}
Expand All @@ -1163,6 +1189,87 @@ func (s *eventFeedSession) singleEventFeed(
}
}

const scanLockLimit = 1024

func (s *eventFeedSession) resolveLock(ctx context.Context, regionID uint64, maxVersion uint64) error {
// TODO test whether this function will kill active transaction
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: scanLockLimit,
})

bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
var loc *tikv.KeyLocation
var key []byte
flushRegion := func() error {
var err error
loc, err = s.kvStorage.GetRegionCache().LocateRegionByID(bo, regionID)
if err != nil {
return err
}
key = loc.StartKey
return nil
}
if err := flushRegion(); err != nil {
return errors.Trace(err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req.ScanLock().StartKey = key
resp, err := s.kvStorage.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
if err := flushRegion(); err != nil {
return errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}
locksResp := resp.Resp.(*kvrpcpb.ScanLockResponse)
if locksResp.GetError() != nil {
return errors.Errorf("unexpected scanlock error: %s", locksResp)
}
locksInfo := locksResp.GetLocks()
locks := make([]*tikv.Lock, len(locksInfo))
for i := range locksInfo {
locks[i] = tikv.NewLock(locksInfo[i])
}

_, _, err1 := s.kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks)
if err1 != nil {
return errors.Trace(err1)
}
if len(locks) < scanLockLimit {
key = loc.EndKey
} else {
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(loc.EndKey) != 0 && bytes.Compare(key, loc.EndKey) >= 0) {
break
}
bo = tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
}
log.Info("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Uint64("maxVersion", maxVersion))
return nil
}

func assembleCommitEvent(entry *cdcpb.Event_Row, value []byte) (*model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *clientSuite) TestNewClose(c *check.C) {
cluster := mocktikv.NewCluster()
pdCli := mocktikv.NewPDClient(cluster)

cli, err := NewCDCClient(pdCli)
cli, err := NewCDCClient(pdCli, nil)
c.Assert(err, check.IsNil)

err = cli.Close()
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func mustGetValue(t require.TestingT, eventCh <-chan *model.RegionFeedEvent, val
// TestSplit try split on every region, and test can get value event from
// every region after split.
func TestSplit(t require.TestingT, pdCli pd.Client, storage kv.Storage) {
cli, err := NewCDCClient(pdCli)
cli, err := NewCDCClient(pdCli, storage.(tikv.Storage))
require.NoError(t, err)
defer cli.Close()

Expand Down Expand Up @@ -211,7 +211,7 @@ func mustDeleteKey(t require.TestingT, storage kv.Storage, key []byte) {

// TestGetKVSimple test simple KV operations
func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage kv.Storage) {
cli, err := NewCDCClient(pdCli)
cli, err := NewCDCClient(pdCli, storage.(tikv.Storage))
require.NoError(t, err)
defer cli.Close()

Expand Down
2 changes: 1 addition & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (o *Owner) newChangeFeed(
return nil, errors.Trace(err)
}

ddlHandler := newDDLHandler(o.pdClient, checkpointTs)
ddlHandler := newDDLHandler(o.pdClient, kvStore, checkpointTs)

existingTables := make(map[uint64]uint64)
for captureID, taskStatus := range processorsInfos {
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"sync"

tidbkv "github.com/pingcap/tidb/kv"

"github.com/pingcap/errors"
timodel "github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/v4/client"
Expand All @@ -38,10 +40,10 @@ type ddlHandler struct {
cancel func()
}

func newDDLHandler(pdCli pd.Client, checkpointTS uint64) *ddlHandler {
func newDDLHandler(pdCli pd.Client, kvStorage tidbkv.Storage, checkpointTS uint64) *ddlHandler {
// The key in DDL kv pair returned from TiKV is already memcompariable encoded,
// so we set `needEncode` to false.
plr := puller.NewPuller(pdCli, checkpointTS, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, nil)
plr := puller.NewPuller(pdCli, kvStorage, checkpointTS, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, nil)
ctx, cancel := context.WithCancel(context.Background())
h := &ddlHandler{
puller: plr,
Expand Down
19 changes: 13 additions & 6 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

tidbkv "github.com/pingcap/tidb/kv"

"github.com/cenkalti/backoff"
"github.com/google/uuid"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -67,9 +69,10 @@ type processor struct {
changefeed model.ChangeFeedInfo
limitter *puller.BlurResourceLimitter

pdCli pd.Client
etcdCli kv.CDCEtcdClient
session *concurrency.Session
pdCli pd.Client
kvStorage tidbkv.Storage
etcdCli kv.CDCEtcdClient
session *concurrency.Session

sink sink.Sink

Expand Down Expand Up @@ -119,7 +122,10 @@ func newProcessor(
if err != nil {
return nil, errors.Annotatef(err, "create pd client failed, addr: %v", endpoints)
}

kvStorage, err := kv.CreateTiStore(strings.Join(endpoints, ","))
if err != nil {
return nil, errors.Trace(err)
}
cdcEtcdCli := kv.NewCDCEtcdClient(etcdCli)

tsRWriter, err := fNewTsRWriter(cdcEtcdCli, changefeedID, captureID)
Expand All @@ -132,7 +138,7 @@ func newProcessor(
// The key in DDL kv pair returned from TiKV is already memcompariable encoded,
// so we set `needEncode` to false.
log.Info("start processor with startts", zap.Uint64("startts", checkpointTs))
ddlPuller := puller.NewPuller(pdCli, checkpointTs, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, limitter)
ddlPuller := puller.NewPuller(pdCli, kvStorage, checkpointTs, []util.Span{util.GetDDLSpan(), util.GetAddIndexDDLSpan()}, false, limitter)
ctx = util.PutTableIDInCtx(ctx, 0)
schemaStorage, err := createSchemaStorage(endpoints, checkpointTs)
if err != nil {
Expand All @@ -146,6 +152,7 @@ func newProcessor(
changefeedID: changefeedID,
changefeed: changefeed,
pdCli: pdCli,
kvStorage: kvStorage,
etcdCli: cdcEtcdCli,
session: session,
sink: sink,
Expand Down Expand Up @@ -626,7 +633,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
// so we set `needEncode` to true.
span := util.GetTableSpan(tableID, true)
sorter := puller.NewEntrySorter()
puller := puller.NewPuller(p.pdCli, startTs, []util.Span{span}, true, p.limitter)
puller := puller.NewPuller(p.pdCli, p.kvStorage, startTs, []util.Span{span}, true, p.limitter)

go func() {
err := puller.Run(ctx)
Expand Down
Loading