Skip to content

Commit

Permalink
add timeout before remove capture
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jul 27, 2023
1 parent 9f71ecd commit c8721d3
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 18 deletions.
6 changes: 3 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalStateWithTTL(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
c.setOwner(owner)
c.setController(controller)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
globalState := orchestrator.NewGlobalStateWithTTL(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL)

globalState.SetOnCaptureAdded(func(captureID model.CaptureID, addr string) {
c.MessageRouter.AddPeer(captureID, addr)
Expand All @@ -511,7 +511,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
ownerCtx := cdcContext.NewContext(ctx, newGlobalVars)
g.Go(func() error {
return c.runEtcdWorker(ownerCtx, owner,
orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()),
orchestrator.NewGlobalStateWithTTL(c.EtcdClient.GetClusterID(), c.config.CaptureSessionTTL),
ownerFlushInterval, util.RoleOwner.String())
})
g.Go(func() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func (worker *EtcdWorker) applyUpdates() error {
return errors.Trace(err)
}
}
worker.state.UpdatePendingChange()

worker.pendingUpdates = worker.pendingUpdates[:0]
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type bankReactorState struct {

const bankTestPrefix = "/ticdc/test/bank/"

func (b *bankReactorState) UpdatePendingChange() {
return
}

func (b *bankReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
require.True(b.t, strings.HasPrefix(key.String(), bankTestPrefix))
indexStr := key.String()[len(bankTestPrefix):]
Expand Down
12 changes: 12 additions & 0 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) UpdatePendingChange() {
return
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
Expand Down Expand Up @@ -283,6 +287,10 @@ type intReactorState struct {
lastVal int
}

func (s *intReactorState) UpdatePendingChange() {
return
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
Expand Down Expand Up @@ -372,6 +380,10 @@ type commonReactorState struct {
pendingPatches []DataPatch
}

func (s *commonReactorState) UpdatePendingChange() {
return
}

func (s *commonReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
s.state[key.String()] = string(value)
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte, isInit bool) error

// UpdatePendingChange is called by EtcdWorker to notify the Reactor to apply the pending changes.
UpdatePendingChange()

// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
// a slice of DataPatch will be committed as one ETCD txn
Expand Down
49 changes: 38 additions & 11 deletions pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package orchestrator

import (
"reflect"
"time"

"github.com/goccy/go-json"
"github.com/pingcap/errors"
Expand All @@ -39,16 +40,39 @@ type GlobalReactorState struct {
// to be called when captures are added and removed.
onCaptureAdded func(captureID model.CaptureID, addr string)
onCaptureRemoved func(captureID model.CaptureID)

captureSessionTTL int
toRemoveCaptures map[model.CaptureID]time.Time
}

func NewGlobalStateWithTTL(clusterID string, captureSessionTTL int) *GlobalReactorState {
return &GlobalReactorState{
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
captureSessionTTL: captureSessionTTL,
toRemoveCaptures: make(map[model.CaptureID]time.Time),
}
}

// NewGlobalState creates a new global state
func NewGlobalState(clusterID string) *GlobalReactorState {
return &GlobalReactorState{
ClusterID: clusterID,
Owner: map[string]struct{}{},
Captures: make(map[model.CaptureID]*model.CaptureInfo),
Upstreams: make(map[model.UpstreamID]*model.UpstreamInfo),
Changefeeds: make(map[model.ChangeFeedID]*ChangefeedReactorState),
return NewGlobalStateWithTTL(clusterID, 0)
}

// UpdatePendingChange implements the ReactorState interface
func (s *GlobalReactorState) UpdatePendingChange() {
for c, t := range s.toRemoveCaptures {
if time.Since(t) >= time.Duration(s.captureSessionTTL+5)*time.Second {
log.Info("remote capture offline", zap.Any("info", s.Captures[c]))
delete(s.Captures, c)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(c)
}
delete(s.toRemoveCaptures, c)
}
}
}

Expand All @@ -59,6 +83,7 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
if err != nil {
return errors.Trace(err)
}

switch k.Tp {
case etcd.CDCKeyTypeOwner:
if value != nil {
Expand All @@ -69,11 +94,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
return nil
case etcd.CDCKeyTypeCapture:
if value == nil {
log.Info("remote capture offline", zap.Any("info", s.Captures[k.CaptureID]))
delete(s.Captures, k.CaptureID)
if s.onCaptureRemoved != nil {
s.onCaptureRemoved(k.CaptureID)
}
log.Info("remote capture offline detected", zap.Any("info", s.Captures[k.CaptureID]))
s.toRemoveCaptures[k.CaptureID] = time.Now()
return nil
}

Expand Down Expand Up @@ -174,6 +196,11 @@ func NewChangefeedReactorState(clusterID string,
}
}

// UpdatePendingChange implements the ReactorState interface
func (s *ChangefeedReactorState) UpdatePendingChange() {
return
}

// Update implements the ReactorState interface
func (s *ChangefeedReactorState) Update(key util.EtcdKey, value []byte, _ bool) error {
k := new(etcd.CDCKey)
Expand Down
21 changes: 17 additions & 4 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,13 @@ func TestPatchTaskPosition(t *testing.T) {
}

func TestGlobalStateUpdate(t *testing.T) {
t.Parallel()

testCases := []struct {
updateKey []string
updateValue []string
expected GlobalReactorState
timeout int
}{
{ // common case
updateKey: []string{
Expand Down Expand Up @@ -527,13 +530,14 @@ func TestGlobalStateUpdate(t *testing.T) {
`55551111`,
`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300"}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,
"admin-job-type":0}`,
"admin-job-type":0}`,
`{"resolved-ts":421980720003809281,"checkpoint-ts":421980719742451713,
"admin-job-type":0}`,
"admin-job-type":0}`,
``,
``,
``,
},
timeout: 6,
expected: GlobalReactorState{
ClusterID: etcd.DefaultCDCClusterID,
Owner: map[string]struct{}{"22317526c4fc9a38": {}},
Expand Down Expand Up @@ -564,12 +568,16 @@ func TestGlobalStateUpdate(t *testing.T) {
err := state.Update(util.NewEtcdKey(k), value, false)
require.Nil(t, err)
}
time.Sleep(time.Duration(tc.timeout) * time.Second)
state.UpdatePendingChange()
require.True(t, cmp.Equal(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})),
cmp.Diff(state, &tc.expected, cmpopts.IgnoreUnexported(GlobalReactorState{}, ChangefeedReactorState{})))
}
}

func TestCaptureChangeHooks(t *testing.T) {
t.Parallel()

state := NewGlobalState(etcd.DefaultCDCClusterID)

var callCount int
Expand All @@ -594,13 +602,18 @@ func TestCaptureChangeHooks(t *testing.T) {
etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"),
captureInfoBytes, false)
require.Nil(t, err)
require.Equal(t, callCount, 1)
require.Eventually(t, func() bool {
return callCount == 1
}, time.Second*3, 10*time.Millisecond)

err = state.Update(util.NewEtcdKey(
etcd.CaptureInfoKeyPrefix(etcd.DefaultCDCClusterID)+"/capture-1"),
nil /* delete */, false)
require.Nil(t, err)
require.Equal(t, callCount, 2)
require.Eventually(t, func() bool {
state.UpdatePendingChange()
return callCount == 2
}, time.Second*10, 10*time.Millisecond)
}

func TestCheckChangefeedNormal(t *testing.T) {
Expand Down

0 comments on commit c8721d3

Please sign in to comment.