From 1dffb9cd70cc3713d91b510af888c70431db9fbc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 21 Sep 2022 18:33:02 +0800 Subject: [PATCH] sinkv2(cdc): improve conflict detector slots (#7133) (#7148) close pingcap/tiflow#7139 --- cdc/sinkv2/eventsink/txn/event.go | 8 ++-- cdc/sinkv2/eventsink/txn/txn_sink.go | 2 +- cdc/sinkv2/eventsink/txn/txn_sink_test.go | 3 +- pkg/causality/internal/slots.go | 55 ++++++++++++++++------- pkg/causality/internal/slots_test.go | 10 ++--- pkg/causality/tests/workload.go | 4 +- pkg/causality/worker.go | 4 +- 7 files changed, 55 insertions(+), 31 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/event.go b/cdc/sinkv2/eventsink/txn/event.go index 5b3698f4905..4913f4860af 100644 --- a/cdc/sinkv2/eventsink/txn/event.go +++ b/cdc/sinkv2/eventsink/txn/event.go @@ -36,13 +36,13 @@ func newTxnEvent(event *eventsink.TxnCallbackableEvent) *txnEvent { // ConflictKeys implements causality.txnEvent interface. func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 { - keys := genTxnKeys(e.TxnCallbackableEvent.Event, numSlots) - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + keys := genTxnKeys(e.TxnCallbackableEvent.Event) + sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots }) return keys } // genTxnKeys returns hash keys for `txn`. -func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 { +func genTxnKeys(txn *model.SingleTableTxn) []uint64 { if len(txn.Rows) == 0 { return nil } @@ -53,7 +53,7 @@ func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 { if n, err := hasher.Write(key); n != len(key) || err != nil { log.Panic("transaction key hash fail") } - hashRes[uint64(hasher.Sum32())%numSlots] = struct{}{} + hashRes[uint64(hasher.Sum32())] = struct{}{} hasher.Reset() } } diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go index 03745b326c9..7dbf5e89d07 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -32,7 +32,7 @@ import ( const ( // DefaultConflictDetectorSlots indicates the default slot count of conflict detector. - DefaultConflictDetectorSlots uint64 = 8 * 1024 * 1024 + DefaultConflictDetectorSlots uint64 = 16 * 1024 ) // Assert EventSink[E event.TableEvent] implementation diff --git a/cdc/sinkv2/eventsink/txn/txn_sink_test.go b/cdc/sinkv2/eventsink/txn/txn_sink_test.go index 9cdc1f8eecf..2c2d2bfb2a4 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink_test.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink_test.go @@ -15,7 +15,6 @@ package txn import ( "context" - "math" "sort" "sync/atomic" "testing" @@ -230,7 +229,7 @@ func TestGenKeys(t *testing.T) { expected: []uint64{318190470, 2095136920, 2658640457}, }} for _, tc := range testCases { - keys := genTxnKeys(tc.txn, math.MaxUint64) + keys := genTxnKeys(tc.txn) sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) require.Equal(t, tc.expected, keys) } diff --git a/pkg/causality/internal/slots.go b/pkg/causality/internal/slots.go index 9153a3fa66f..6c8e6f02d18 100644 --- a/pkg/causality/internal/slots.go +++ b/pkg/causality/internal/slots.go @@ -14,6 +14,7 @@ package internal import ( + "math" "sync" ) @@ -39,8 +40,12 @@ type Slots[E SlotNode[E]] struct { // NewSlots creates a new Slots. func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] { + slots := make([]slot[E], numSlots) + for i := uint64(0); i < numSlots; i++ { + slots[i].nodes = make(map[uint64]E, 8) + } return &Slots[E]{ - slots: make([]slot[E], numSlots), + slots: slots, numSlots: numSlots, } } @@ -51,37 +56,57 @@ func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] { // dependee. func (s *Slots[E]) Add(elem E, keys []uint64) { dependOnList := make(map[int64]E, len(keys)) + var lastSlot uint64 = math.MaxUint64 for _, key := range keys { - s.slots[key].mu.Lock() - if s.slots[key].tail == nil { - s.slots[key].tail = new(E) - } else { - prevID := (*s.slots[key].tail).NodeID() - dependOnList[prevID] = *s.slots[key].tail + slotIdx := getSlot(key, s.numSlots) + if lastSlot != slotIdx { + s.slots[slotIdx].mu.Lock() + lastSlot = slotIdx + } + if tail, ok := s.slots[slotIdx].nodes[key]; ok { + prevID := tail.NodeID() + dependOnList[prevID] = tail } - *s.slots[key].tail = elem + s.slots[slotIdx].nodes[key] = elem } elem.DependOn(dependOnList) + // Lock those slots one by one and then unlock them one by one, so that // we can avoid 2 transactions get executed interleaved. + lastSlot = math.MaxUint64 for _, key := range keys { - s.slots[key].mu.Unlock() + slotIdx := getSlot(key, s.numSlots) + if lastSlot != slotIdx { + s.slots[slotIdx].mu.Unlock() + lastSlot = slotIdx + } } } // Free removes an element from the Slots. func (s *Slots[E]) Free(elem E, keys []uint64) { + var lastSlot uint64 = math.MaxUint64 for _, key := range keys { - s.slots[key].mu.Lock() - if s.slots[key].tail != nil && (*s.slots[key].tail).NodeID() == elem.NodeID() { - s.slots[key].tail = nil + slotIdx := getSlot(key, s.numSlots) + if lastSlot != slotIdx { + s.slots[slotIdx].mu.Lock() + } + if tail, ok := s.slots[slotIdx].nodes[key]; ok && tail.NodeID() == elem.NodeID() { + delete(s.slots[slotIdx].nodes, key) + } + if lastSlot != slotIdx { + s.slots[slotIdx].mu.Unlock() + lastSlot = slotIdx } - s.slots[key].mu.Unlock() } elem.Free() } type slot[E SlotNode[E]] struct { - tail *E // `tail` points to the last node in the slot. - mu sync.Mutex + nodes map[uint64]E + mu sync.Mutex +} + +func getSlot(key, numSlots uint64) uint64 { + return key % numSlots } diff --git a/pkg/causality/internal/slots_test.go b/pkg/causality/internal/slots_test.go index 8b768249dbc..ea5077d97ab 100644 --- a/pkg/causality/internal/slots_test.go +++ b/pkg/causality/internal/slots_test.go @@ -45,9 +45,9 @@ func TestSlotsTrivial(t *testing.T) { slots.Free(nodes[i], []uint64{1, 2, 3, 4, 5}) } - require.Equal(t, (**Node)(nil), slots.slots[1].tail) - require.Equal(t, (**Node)(nil), slots.slots[2].tail) - require.Equal(t, (**Node)(nil), slots.slots[3].tail) - require.Equal(t, (**Node)(nil), slots.slots[4].tail) - require.Equal(t, (**Node)(nil), slots.slots[5].tail) + require.Equal(t, 0, len(slots.slots[1].nodes)) + require.Equal(t, 0, len(slots.slots[2].nodes)) + require.Equal(t, 0, len(slots.slots[3].nodes)) + require.Equal(t, 0, len(slots.slots[4].nodes)) + require.Equal(t, 0, len(slots.slots[5].nodes)) } diff --git a/pkg/causality/tests/workload.go b/pkg/causality/tests/workload.go index 1116aacd0cb..c533fbce8ac 100644 --- a/pkg/causality/tests/workload.go +++ b/pkg/causality/tests/workload.go @@ -40,7 +40,7 @@ func newUniformGenerator(workingSetSize int64, batchSize int, numSlots uint64) * func (g *uniformGenerator) Next() []uint64 { set := make(map[uint64]struct{}, g.batchSize) for i := 0; i < g.batchSize; i++ { - key := uint64(rand.Int63n(g.workingSetSize)) % g.numSlots + key := uint64(rand.Int63n(g.workingSetSize)) set[key] = struct{}{} } @@ -49,6 +49,6 @@ func (g *uniformGenerator) Next() []uint64 { ret = append(ret, key) } - sort.Slice(ret, func(i, j int) bool { return ret[i] < ret[j] }) + sort.Slice(ret, func(i, j int) bool { return ret[i]%g.numSlots < ret[j]%g.numSlots }) return ret } diff --git a/pkg/causality/worker.go b/pkg/causality/worker.go index e3adef925f4..b50e7445645 100644 --- a/pkg/causality/worker.go +++ b/pkg/causality/worker.go @@ -18,10 +18,10 @@ type ( ) type txnEvent interface { - // Keys are in range [0, numSlots) and must be deduped. + // Keys are in range [0, math.MaxUint64) and must be deduped. // // NOTE: if the conflict detector is accessed by multiple threads concurrently, - // ConflictKeys must also be sorted. + // ConflictKeys must also be sorted based on `key % numSlots`. ConflictKeys(numSlots uint64) []conflictKey }