Skip to content

Commit

Permalink
sinkv2(cdc): improve conflict detector slots (#7133) (#7148)
Browse files Browse the repository at this point in the history
close #7139
  • Loading branch information
ti-chi-bot authored Sep 21, 2022
1 parent 1ddf881 commit 1dffb9c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 31 deletions.
8 changes: 4 additions & 4 deletions cdc/sinkv2/eventsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func newTxnEvent(event *eventsink.TxnCallbackableEvent) *txnEvent {

// ConflictKeys implements causality.txnEvent interface.
func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 {
keys := genTxnKeys(e.TxnCallbackableEvent.Event, numSlots)
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
keys := genTxnKeys(e.TxnCallbackableEvent.Event)
sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots })
return keys
}

// genTxnKeys returns hash keys for `txn`.
func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 {
func genTxnKeys(txn *model.SingleTableTxn) []uint64 {
if len(txn.Rows) == 0 {
return nil
}
Expand All @@ -53,7 +53,7 @@ func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 {
if n, err := hasher.Write(key); n != len(key) || err != nil {
log.Panic("transaction key hash fail")
}
hashRes[uint64(hasher.Sum32())%numSlots] = struct{}{}
hashRes[uint64(hasher.Sum32())] = struct{}{}
hasher.Reset()
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

const (
// DefaultConflictDetectorSlots indicates the default slot count of conflict detector.
DefaultConflictDetectorSlots uint64 = 8 * 1024 * 1024
DefaultConflictDetectorSlots uint64 = 16 * 1024
)

// Assert EventSink[E event.TableEvent] implementation
Expand Down
3 changes: 1 addition & 2 deletions cdc/sinkv2/eventsink/txn/txn_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package txn

import (
"context"
"math"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -230,7 +229,7 @@ func TestGenKeys(t *testing.T) {
expected: []uint64{318190470, 2095136920, 2658640457},
}}
for _, tc := range testCases {
keys := genTxnKeys(tc.txn, math.MaxUint64)
keys := genTxnKeys(tc.txn)
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
require.Equal(t, tc.expected, keys)
}
Expand Down
55 changes: 40 additions & 15 deletions pkg/causality/internal/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package internal

import (
"math"
"sync"
)

Expand All @@ -39,8 +40,12 @@ type Slots[E SlotNode[E]] struct {

// NewSlots creates a new Slots.
func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] {
slots := make([]slot[E], numSlots)
for i := uint64(0); i < numSlots; i++ {
slots[i].nodes = make(map[uint64]E, 8)
}
return &Slots[E]{
slots: make([]slot[E], numSlots),
slots: slots,
numSlots: numSlots,
}
}
Expand All @@ -51,37 +56,57 @@ func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] {
// dependee.
func (s *Slots[E]) Add(elem E, keys []uint64) {
dependOnList := make(map[int64]E, len(keys))
var lastSlot uint64 = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Lock()
if s.slots[key].tail == nil {
s.slots[key].tail = new(E)
} else {
prevID := (*s.slots[key].tail).NodeID()
dependOnList[prevID] = *s.slots[key].tail
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Lock()
lastSlot = slotIdx
}
if tail, ok := s.slots[slotIdx].nodes[key]; ok {
prevID := tail.NodeID()
dependOnList[prevID] = tail
}
*s.slots[key].tail = elem
s.slots[slotIdx].nodes[key] = elem
}
elem.DependOn(dependOnList)

// Lock those slots one by one and then unlock them one by one, so that
// we can avoid 2 transactions get executed interleaved.
lastSlot = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Unlock()
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Unlock()
lastSlot = slotIdx
}
}
}

// Free removes an element from the Slots.
func (s *Slots[E]) Free(elem E, keys []uint64) {
var lastSlot uint64 = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Lock()
if s.slots[key].tail != nil && (*s.slots[key].tail).NodeID() == elem.NodeID() {
s.slots[key].tail = nil
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Lock()
}
if tail, ok := s.slots[slotIdx].nodes[key]; ok && tail.NodeID() == elem.NodeID() {
delete(s.slots[slotIdx].nodes, key)
}
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Unlock()
lastSlot = slotIdx
}
s.slots[key].mu.Unlock()
}
elem.Free()
}

type slot[E SlotNode[E]] struct {
tail *E // `tail` points to the last node in the slot.
mu sync.Mutex
nodes map[uint64]E
mu sync.Mutex
}

func getSlot(key, numSlots uint64) uint64 {
return key % numSlots
}
10 changes: 5 additions & 5 deletions pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestSlotsTrivial(t *testing.T) {
slots.Free(nodes[i], []uint64{1, 2, 3, 4, 5})
}

require.Equal(t, (**Node)(nil), slots.slots[1].tail)
require.Equal(t, (**Node)(nil), slots.slots[2].tail)
require.Equal(t, (**Node)(nil), slots.slots[3].tail)
require.Equal(t, (**Node)(nil), slots.slots[4].tail)
require.Equal(t, (**Node)(nil), slots.slots[5].tail)
require.Equal(t, 0, len(slots.slots[1].nodes))
require.Equal(t, 0, len(slots.slots[2].nodes))
require.Equal(t, 0, len(slots.slots[3].nodes))
require.Equal(t, 0, len(slots.slots[4].nodes))
require.Equal(t, 0, len(slots.slots[5].nodes))
}
4 changes: 2 additions & 2 deletions pkg/causality/tests/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newUniformGenerator(workingSetSize int64, batchSize int, numSlots uint64) *
func (g *uniformGenerator) Next() []uint64 {
set := make(map[uint64]struct{}, g.batchSize)
for i := 0; i < g.batchSize; i++ {
key := uint64(rand.Int63n(g.workingSetSize)) % g.numSlots
key := uint64(rand.Int63n(g.workingSetSize))
set[key] = struct{}{}
}

Expand All @@ -49,6 +49,6 @@ func (g *uniformGenerator) Next() []uint64 {
ret = append(ret, key)
}

sort.Slice(ret, func(i, j int) bool { return ret[i] < ret[j] })
sort.Slice(ret, func(i, j int) bool { return ret[i]%g.numSlots < ret[j]%g.numSlots })
return ret
}
4 changes: 2 additions & 2 deletions pkg/causality/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type (
)

type txnEvent interface {
// Keys are in range [0, numSlots) and must be deduped.
// Keys are in range [0, math.MaxUint64) and must be deduped.
//
// NOTE: if the conflict detector is accessed by multiple threads concurrently,
// ConflictKeys must also be sorted.
// ConflictKeys must also be sorted based on `key % numSlots`.
ConflictKeys(numSlots uint64) []conflictKey
}

Expand Down

0 comments on commit 1dffb9c

Please sign in to comment.