Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(cdc): improve conflict detector slots (#7133) #7148

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cdc/sinkv2/eventsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func newTxnEvent(event *eventsink.TxnCallbackableEvent) *txnEvent {

// ConflictKeys implements causality.txnEvent interface.
func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 {
keys := genTxnKeys(e.TxnCallbackableEvent.Event, numSlots)
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
keys := genTxnKeys(e.TxnCallbackableEvent.Event)
sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots })
return keys
}

// genTxnKeys returns hash keys for `txn`.
func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 {
func genTxnKeys(txn *model.SingleTableTxn) []uint64 {
if len(txn.Rows) == 0 {
return nil
}
Expand All @@ -53,7 +53,7 @@ func genTxnKeys(txn *model.SingleTableTxn, numSlots uint64) []uint64 {
if n, err := hasher.Write(key); n != len(key) || err != nil {
log.Panic("transaction key hash fail")
}
hashRes[uint64(hasher.Sum32())%numSlots] = struct{}{}
hashRes[uint64(hasher.Sum32())] = struct{}{}
hasher.Reset()
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

const (
// DefaultConflictDetectorSlots indicates the default slot count of conflict detector.
DefaultConflictDetectorSlots uint64 = 8 * 1024 * 1024
DefaultConflictDetectorSlots uint64 = 16 * 1024
)

// Assert EventSink[E event.TableEvent] implementation
Expand Down
3 changes: 1 addition & 2 deletions cdc/sinkv2/eventsink/txn/txn_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package txn

import (
"context"
"math"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -230,7 +229,7 @@ func TestGenKeys(t *testing.T) {
expected: []uint64{318190470, 2095136920, 2658640457},
}}
for _, tc := range testCases {
keys := genTxnKeys(tc.txn, math.MaxUint64)
keys := genTxnKeys(tc.txn)
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
require.Equal(t, tc.expected, keys)
}
Expand Down
55 changes: 40 additions & 15 deletions pkg/causality/internal/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package internal

import (
"math"
"sync"
)

Expand All @@ -39,8 +40,12 @@ type Slots[E SlotNode[E]] struct {

// NewSlots creates a new Slots.
func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] {
slots := make([]slot[E], numSlots)
for i := uint64(0); i < numSlots; i++ {
slots[i].nodes = make(map[uint64]E, 8)
}
return &Slots[E]{
slots: make([]slot[E], numSlots),
slots: slots,
numSlots: numSlots,
}
}
Expand All @@ -51,37 +56,57 @@ func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] {
// dependee.
func (s *Slots[E]) Add(elem E, keys []uint64) {
dependOnList := make(map[int64]E, len(keys))
var lastSlot uint64 = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Lock()
if s.slots[key].tail == nil {
s.slots[key].tail = new(E)
} else {
prevID := (*s.slots[key].tail).NodeID()
dependOnList[prevID] = *s.slots[key].tail
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Lock()
lastSlot = slotIdx
}
if tail, ok := s.slots[slotIdx].nodes[key]; ok {
prevID := tail.NodeID()
dependOnList[prevID] = tail
}
*s.slots[key].tail = elem
s.slots[slotIdx].nodes[key] = elem
}
elem.DependOn(dependOnList)

// Lock those slots one by one and then unlock them one by one, so that
// we can avoid 2 transactions get executed interleaved.
lastSlot = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Unlock()
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Unlock()
lastSlot = slotIdx
}
}
}

// Free removes an element from the Slots.
func (s *Slots[E]) Free(elem E, keys []uint64) {
var lastSlot uint64 = math.MaxUint64
for _, key := range keys {
s.slots[key].mu.Lock()
if s.slots[key].tail != nil && (*s.slots[key].tail).NodeID() == elem.NodeID() {
s.slots[key].tail = nil
slotIdx := getSlot(key, s.numSlots)
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Lock()
}
if tail, ok := s.slots[slotIdx].nodes[key]; ok && tail.NodeID() == elem.NodeID() {
delete(s.slots[slotIdx].nodes, key)
}
if lastSlot != slotIdx {
s.slots[slotIdx].mu.Unlock()
lastSlot = slotIdx
}
s.slots[key].mu.Unlock()
}
elem.Free()
}

type slot[E SlotNode[E]] struct {
tail *E // `tail` points to the last node in the slot.
mu sync.Mutex
nodes map[uint64]E
mu sync.Mutex
}

func getSlot(key, numSlots uint64) uint64 {
return key % numSlots
}
10 changes: 5 additions & 5 deletions pkg/causality/internal/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func TestSlotsTrivial(t *testing.T) {
slots.Free(nodes[i], []uint64{1, 2, 3, 4, 5})
}

require.Equal(t, (**Node)(nil), slots.slots[1].tail)
require.Equal(t, (**Node)(nil), slots.slots[2].tail)
require.Equal(t, (**Node)(nil), slots.slots[3].tail)
require.Equal(t, (**Node)(nil), slots.slots[4].tail)
require.Equal(t, (**Node)(nil), slots.slots[5].tail)
require.Equal(t, 0, len(slots.slots[1].nodes))
require.Equal(t, 0, len(slots.slots[2].nodes))
require.Equal(t, 0, len(slots.slots[3].nodes))
require.Equal(t, 0, len(slots.slots[4].nodes))
require.Equal(t, 0, len(slots.slots[5].nodes))
}
4 changes: 2 additions & 2 deletions pkg/causality/tests/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newUniformGenerator(workingSetSize int64, batchSize int, numSlots uint64) *
func (g *uniformGenerator) Next() []uint64 {
set := make(map[uint64]struct{}, g.batchSize)
for i := 0; i < g.batchSize; i++ {
key := uint64(rand.Int63n(g.workingSetSize)) % g.numSlots
key := uint64(rand.Int63n(g.workingSetSize))
set[key] = struct{}{}
}

Expand All @@ -49,6 +49,6 @@ func (g *uniformGenerator) Next() []uint64 {
ret = append(ret, key)
}

sort.Slice(ret, func(i, j int) bool { return ret[i] < ret[j] })
sort.Slice(ret, func(i, j int) bool { return ret[i]%g.numSlots < ret[j]%g.numSlots })
return ret
}
4 changes: 2 additions & 2 deletions pkg/causality/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type (
)

type txnEvent interface {
// Keys are in range [0, numSlots) and must be deduped.
// Keys are in range [0, math.MaxUint64) and must be deduped.
//
// NOTE: if the conflict detector is accessed by multiple threads concurrently,
// ConflictKeys must also be sorted.
// ConflictKeys must also be sorted based on `key % numSlots`.
ConflictKeys(numSlots uint64) []conflictKey
}

Expand Down