-
Notifications
You must be signed in to change notification settings - Fork 289
/
Copy pathinterfaces.go
77 lines (66 loc) · 2.53 KB
/
interfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package orchestrator
import (
"context"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
)
// Reactor is a stateful transform of states.
// It models Owner and Processor, which reacts according to updates in Etcd.
type Reactor interface {
Tick(ctx context.Context, state ReactorState) (nextState ReactorState, err error)
}
// DataPatch represents an update of state
type DataPatch interface {
Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
}
// ReactorState models the Etcd state of a reactor
type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte, isInit bool) error
// GetPatches is called by EtcdWorker, and should return many slices of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
// a slice of DataPatch will be committed as one ETCD txn
GetPatches() [][]DataPatch
}
// SingleDataPatch represents an update to a given Etcd key
type SingleDataPatch struct {
Key util.EtcdKey
// Func should be a pure function that returns a new value given the old value.
// The function is called each time the EtcdWorker initiates an Etcd transaction.
Func func(old []byte) (newValue []byte, changed bool, err error)
}
// Patch implements the DataPatch interface
func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error {
value := valueMap[s.Key]
newValue, changed, err := s.Func(value)
if err != nil {
return err
}
if !changed {
return nil
}
changedSet[s.Key] = struct{}{}
if newValue == nil {
delete(valueMap, s.Key)
} else {
valueMap[s.Key] = newValue
}
return nil
}
// MultiDatePatch represents an update to many keys
type MultiDatePatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error
// Patch implements the DataPatch interface
func (m MultiDatePatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error {
return m(valueMap, changedSet)
}