Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancergroup: Add trigger point to gracefully switch a child #5251

Merged
merged 4 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ func (sbc *subBalancerWrapper) resolverError(err error) {
b.ResolverError(err)
}

func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move this above stopBalancer. I often treat methods close/stop as code boundaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sbc.builder = builder
b := sbc.balancer
// Even if you get an add and it persists builder but doesn't start
// balancer, this would leave graceful switch being nil, in which we are
// correctly overwriting with the recent builder here as well to use later.
// The graceful switch balancer's presence is an invariant of whether the
// balancer group is closed or not (if closed, nil, if started, present).
if sbc.balancer != nil {
sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
b.SwitchTo(sbc.builder)
}
}

func (sbc *subBalancerWrapper) stopBalancer() {
sbc.balancer.Close()
sbc.balancer = nil
Expand Down Expand Up @@ -332,6 +346,23 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
bg.outgoingMu.Unlock()
}

// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
bg.outgoingMu.Lock()
// This does not deal with the balancer cache because this call should come
// after an Add call for a given child balancer. If the child is removed,
// the caller will call Add if the child balancer comes back which would
// then deal with the balancer cache.
sbc := bg.idToBalancerConfig[id]
if sbc == nil {
// simply ignore it if not present, don't error
return
}
sbc.gracefulSwitch(builder)
bg.outgoingMu.Unlock()
}

// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
Expand Down
142 changes: 142 additions & 0 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package balancergroup

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
Expand All @@ -34,6 +36,11 @@ import (
"google.golang.org/grpc/resolver"
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)

var (
rrBuilder = balancer.Get(roundrobin.Name)
testBalancerIDs = []string{"b1", "b2", "b3"}
Expand Down Expand Up @@ -534,3 +541,138 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
case <-exitIdleCh:
}
}

// TestBalancerGracefulSwitch tests the graceful switch functionality for a
// child of the balancer group. At first, the child is configured as a round
// robin load balancer, and thus should behave accordingly. The test then
// gracefully switches this child to a custom type which only creates a SubConn
// for the second passed in address and also only picks that created SubConn.
// The new aggregated picker should reflect this change for the child.
func (s) TestBalancerGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})

bg.Start()

m1 := make(map[resolver.Address]balancer.SubConn)
scs := make(map[balancer.SubConn]bool)
for i := 0; i < 2; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
scs[sc] = true
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}

// The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
// change it to a balancer that has separate behavior logically (creating
// SubConn for second address in address list and always picking that
// SubConn), and see if the downstream behavior reflects that change.
bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}

addrs := <-cc.NewSubConnAddrsCh
if addrs[0].Addr != testBackendAddrs[3].Addr {
// Verifies forwarded to new created balancer, as the wrapped pick first
// balancer will delete first address.
t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
}
sc := <-cc.NewSubConnCh

// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-cc.NewPickerCh:
t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
case <-ctx.Done():
}

// Update the pick first balancers SubConn as READY. This will cause
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn which
// corresponds to address 3.
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
pr, err := p2.Pick(balancer.PickInfo{})
if err != nil {
t.Fatalf("error picking: %v", err)
}
if pr.SubConn != sc {
t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
}

// The Graceful Switch process completing for the child should cause the
// SubConns for the balancer being gracefully switched from to get deleted.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case sc := <-cc.RemoveSubConnCh:
// The SubConn removed should have been one of the two created
// SubConns, and both should be deleted.
if ok := scs[sc]; ok {
delete(scs, sc)
continue
} else {
t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
}
}
}
}

type wrappedPickFirstBalancerBuilder struct{}

func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (wrappedPickFirstBalancerBuilder) Name() string {
return "wrappedPickFirstBalancer"
}

type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
return wb.Balancer.UpdateClientConnState(s)
}

func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}