Skip to content

Commit

Permalink
redo
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix committed Jul 29, 2020
1 parent df68f62 commit 5c722b0
Showing 1 changed file with 100 additions and 2 deletions.
102 changes: 100 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package puller

import (
"context"
"github.com/pingcap/ticdc/pkg/notify"
"sync/atomic"
"time"

Expand All @@ -35,8 +36,9 @@ import (
)

const (
defaultPullerEventChanSize = 128000
defaultPullerOutputChanSize = 128000
defaultPullerEventChanSize = 128000
defaultPullerOutputChanSize = 128000
currentVersionUpdateInterval = 2 * time.Second
)

// Puller pull data from tikv and push changes into a buffer
Expand All @@ -59,6 +61,101 @@ type pullerImpl struct {
resolvedTs uint64
}

type kvStorageWithVersionCache struct {
tidbkv.Storage
currentVersionCache model.Ts
closeCh chan struct{}
updateNotifier *notify.Notifier
running int32
}

func newKvStorageWithVersionCache(storage tidbkv.Storage) *kvStorageWithVersionCache {
return &kvStorageWithVersionCache{
Storage: storage,
currentVersionCache: 0,
closeCh: make(chan struct{}),
updateNotifier: new(notify.Notifier),
running: 0,
}
}

func (s *kvStorageWithVersionCache) startUpdating() {
// make sure "go doUpdating()" is launched at most once
if atomic.SwapInt32(&s.running, 1) == 1 {
return
}

doUpdating := func() {
ticker := time.NewTicker(currentVersionUpdateInterval)
for {
select {
case <-s.closeCh:
// signal exit by writing 0
atomic.StoreUint64(&s.currentVersionCache, 0)
return
case <-ticker.C:
version, err := s.Storage.CurrentVersion()
if err != nil {
log.Warn("Getting CurrentVersion from kvStorage failed", zap.Error(err))
continue
}
atomic.StoreUint64(&s.currentVersionCache, version.Ver)
// Signal the completion of one update
s.updateNotifier.Notify()
}
}
}

go doUpdating()
}

func (s *kvStorageWithVersionCache) CurrentVersion() (tidbkv.Version, error) {
ver := atomic.LoadUint64(&s.currentVersionCache)
if ver == 0 {
// ver == 0 means either the goroutine has not been started,
// or that it has exited
s.startUpdating()
r := s.updateNotifier.NewReceiver(0)
defer r.Stop()
loop:
for {
select {
case <-s.closeCh:
// goroutine has exited
return tidbkv.Version{Ver: 0}, errors.New("kvStorage closed")
case <-r.C:
// the cached value has been updated once
ver = atomic.LoadUint64(&s.currentVersionCache)
break loop
}
}
}
return tidbkv.Version{Ver: ver}, nil
}

func (s *kvStorageWithVersionCache) Close() error {
select {
case <-s.closeCh:
return errors.New("kvStorage already closed")
default:
}
close(s.closeCh)

// close asynchronously to avoid blocking the caller
go func() {
r := s.updateNotifier.NewReceiver(2 * time.Second)
defer r.Stop()
for {
if atomic.SwapInt32(&s.running, 1) == 0 || atomic.LoadUint64(&s.currentVersionCache) == 0 {
break
}
<-r.C
}
s.updateNotifier.Close()
}()
return nil
}

// NewPuller create a new Puller fetch event start from checkpointTs
// and put into buf.
func NewPuller(
Expand All @@ -69,6 +166,7 @@ func NewPuller(
spans []regionspan.Span,
limitter *BlurResourceLimitter,
) Puller {
kvStorage = newKvStorageWithVersionCache(kvStorage)
tikvStorage, ok := kvStorage.(tikv.Storage)
if !ok {
log.Fatal("can't create puller for non-tikv storage")
Expand Down

0 comments on commit 5c722b0

Please sign in to comment.