Skip to content

Commit

Permalink
feat: scheduler blocks steal peers (#1224)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 28, 2023
1 parent ccb6495 commit 56cd51a
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 40 deletions.
7 changes: 7 additions & 0 deletions pkg/container/set/safe_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SafeSet interface {
Contains(...interface{}) bool
Len() uint
Range(func(interface{}) bool)
Clear()
}

type safeSet struct {
Expand Down Expand Up @@ -99,3 +100,9 @@ func (s *safeSet) Range(fn func(interface{}) bool) {
}
}
}

func (s *safeSet) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.data = make(map[interface{}]struct{})
}
96 changes: 77 additions & 19 deletions pkg/container/set/safe_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func TestSafeSetAdd(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, ok bool, s Set, value interface{})
expect func(t *testing.T, ok bool, s SafeSet, value interface{})
}{
{
name: "add value",
value: "foo",
expect: func(t *testing.T, ok bool, s Set, value interface{}) {
expect: func(t *testing.T, ok bool, s SafeSet, value interface{}) {
assert := assert.New(t)
assert.Equal(ok, true)
assert.Equal(s.Values(), []interface{}{value})
Expand All @@ -45,7 +45,7 @@ func TestSafeSetAdd(t *testing.T) {
{
name: "add value failed",
value: "foo",
expect: func(t *testing.T, _ bool, s Set, value interface{}) {
expect: func(t *testing.T, _ bool, s SafeSet, value interface{}) {
assert := assert.New(t)
ok := s.Add("foo")
assert.Equal(ok, false)
Expand Down Expand Up @@ -89,12 +89,12 @@ func TestSafeSetDelete(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
expect func(t *testing.T, s SafeSet, value interface{})
}{
{
name: "delete value",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
expect: func(t *testing.T, s SafeSet, value interface{}) {
assert := assert.New(t)
s.Delete(value)
assert.Equal(s.Len(), uint(0))
Expand All @@ -103,7 +103,7 @@ func TestSafeSetDelete(t *testing.T) {
{
name: "delete value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
expect: func(t *testing.T, s SafeSet, _ interface{}) {
assert := assert.New(t)
s.Delete("bar")
assert.Equal(s.Len(), uint(1))
Expand Down Expand Up @@ -148,20 +148,20 @@ func TestSafeSetContains(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
expect func(t *testing.T, s SafeSet, value interface{})
}{
{
name: "contains value",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
expect: func(t *testing.T, s SafeSet, value interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains(value), true)
},
},
{
name: "contains value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
expect: func(t *testing.T, s SafeSet, _ interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains("bar"), false)
},
Expand Down Expand Up @@ -202,19 +202,19 @@ func TestSafeSetContains_Concurrent(t *testing.T) {
func TestSetSafeLen(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
expect func(t *testing.T, s SafeSet)
}{
{
name: "get length",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Len(), uint(1))
},
},
{
name: "get empty set length",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
assert.Equal(s.Len(), uint(0))
},
Expand Down Expand Up @@ -256,26 +256,26 @@ func TestSafeSetLen_Concurrent(t *testing.T) {
func TestSafeSetValues(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
expect func(t *testing.T, s SafeSet)
}{
{
name: "get values",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Values(), []interface{}{"foo"})
},
},
{
name: "get empty values",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
assert.Equal(s.Values(), []interface{}(nil))
},
},
{
name: "get multi values",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
Expand Down Expand Up @@ -320,11 +320,11 @@ func TestSafeSetValues_Concurrent(t *testing.T) {
func TestSafeSetRange(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
expect func(t *testing.T, s SafeSet)
}{
{
name: "range values",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Add("foo")
s.Range(func(v interface{}) bool {
Expand All @@ -335,7 +335,7 @@ func TestSafeSetRange(t *testing.T) {
},
{
name: "range values failed",
expect: func(t *testing.T, s Set) {
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
Expand Down Expand Up @@ -380,3 +380,61 @@ func TestSafeSetRange_Concurrent(t *testing.T) {
}
wg.Wait()
}

func TestSafeSetClear(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s SafeSet)
}{
{
name: "clear empty set",
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
s.Clear()
assert.Equal(s.Values(), []interface{}(nil))
},
},
{
name: "clear set",
expect: func(t *testing.T, s SafeSet) {
assert := assert.New(t)
assert.Equal(s.Add("foo"), true)
s.Clear()
assert.Equal(s.Values(), []interface{}(nil))
assert.Equal(s.Add("foo"), true)
assert.Equal(s.Values(), []interface{}{"foo"})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
tc.expect(t, s)
})
}
}

func TestSafeSetClear_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)

s := NewSafeSet()
nums := rand.Perm(N)

var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
go func(i int) {
s.Add(i)
s.Clear()
wg.Done()
}(i)
}

wg.Wait()
for _, n := range nums {
if s.Contains(n) {
t.Errorf("SafeSet contains element: %v", n)
}
}
}
5 changes: 5 additions & 0 deletions pkg/container/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Set interface {
Contains(...interface{}) bool
Len() uint
Range(func(interface{}) bool)
Clear()
}

type set map[interface{}]struct{}
Expand Down Expand Up @@ -76,3 +77,7 @@ func (s *set) Range(fn func(interface{}) bool) {
}
}
}

func (s *set) Clear() {
*s = set{}
}
34 changes: 34 additions & 0 deletions pkg/container/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,37 @@ func TestSetRange(t *testing.T) {
})
}
}

func TestSetClear(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "clear empty set",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Clear()
assert.Equal(s.Values(), []interface{}(nil))
},
},
{
name: "clear set",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
assert.Equal(s.Add("foo"), true)
s.Clear()
assert.Equal(s.Values(), []interface{}(nil))
assert.Equal(s.Add("foo"), true)
assert.Equal(s.Values(), []interface{}{"foo"})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
tc.expect(t, s)
})
}
}
4 changes: 4 additions & 0 deletions scheduler/resource/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type Peer struct {
// ChildCount is child count
ChildCount *atomic.Int32

// StealPeers is steal peer ids
StealPeers set.SafeSet

// BlockPeers is bad peer ids
BlockPeers set.SafeSet

Expand Down Expand Up @@ -172,6 +175,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
Parent: &atomic.Value{},
Children: &sync.Map{},
ChildCount: atomic.NewInt32(0),
StealPeers: set.NewSafeSet(),
BlockPeers: set.NewSafeSet(),
CreateAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()),
Expand Down
15 changes: 14 additions & 1 deletion scheduler/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,16 @@ func (s *scheduler) NotifyAndFindParent(ctx context.Context, peer *resource.Peer
return []*resource.Peer{}, false
}

// Add steal peers to current peer
peer.StealPeers.Clear()
for _, parent := range parents[1:] {
peer.StealPeers.Add(parent.ID)
}

// Replace peer's parent with scheduled parent
peer.ReplaceParent(parents[0])
peer.Log.Infof("schedule parent successful, replace parent to %s", parents[0].ID)
peer.Log.Infof("schedule parent successful, replace parent to %s and steal peers is %v",
parents[0].ID, peer.StealPeers.Values())
return parents, true
}

Expand Down Expand Up @@ -239,6 +247,11 @@ func (s *scheduler) filterParents(peer *resource.Peer, blocklist set.SafeSet) []
return true
}

if parent.StealPeers.Contains(peer.ID) {
peer.Log.Debugf("parent %s is not selected because it is in steal peers", parent.ID)
return true
}

if parent.ID == peer.ID {
peer.Log.Debug("parent is not selected because it is same")
return true
Expand Down
Loading

0 comments on commit 56cd51a

Please sign in to comment.