From f9968daf34565756c4eeb5eca96b01ab980b23d0 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 31 Oct 2016 14:46:26 -0700 Subject: [PATCH 1/8] grpclb: Support server list expiration --- grpclb/grpclb.go | 66 +++++++++++++++++----- grpclb/grpclb_test.go | 124 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 162 insertions(+), 28 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 996d27aeb7e9..7db6ab1e8794 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -40,6 +40,7 @@ import ( "errors" "fmt" "sync" + "time" "golang.org/x/net/context" "google.golang.org/grpc" @@ -93,16 +94,17 @@ type addrInfo struct { } type balancer struct { - r naming.Resolver - mu sync.Mutex - seq int // a sequence number to make sure addrCh does not get stale addresses. - w naming.Watcher - addrCh chan []grpc.Address - rbs []remoteBalancerInfo - addrs []*addrInfo - next int - waitCh chan struct{} - done bool + r naming.Resolver + mu sync.Mutex + seq int // a sequence number to make sure addrCh does not get stale addresses. + w naming.Watcher + addrCh chan []grpc.Address + rbs []remoteBalancerInfo + addrs []*addrInfo + next int + waitCh chan struct{} + done bool + expTimer *time.Timer } func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo) error { @@ -180,14 +182,36 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo return nil } +func (b *balancer) serverListExpire(seq int) { + b.mu.Lock() + defer b.mu.Unlock() + if b.done || seq < b.seq { + return + } + b.next = 0 + b.addrs = nil + // Ask grpc internals to close the all corresponding connections. + b.addrCh <- nil +} + +func convertDuration(d *lbpb.Duration) time.Duration { + if d == nil { + return 0 + } + return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond +} + func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { + if l == nil { + return + } servers := l.GetServers() + expiration := convertDuration(l.GetExpirationInterval()) var ( sl []*addrInfo addrs []grpc.Address ) for _, s := range servers { - // TODO: Support ExpirationInterval md := metadata.Pairs("lb-token", s.LoadBalanceToken) addr := grpc.Address{ Addr: fmt.Sprintf("%s:%d", s.IpAddress, s.Port), @@ -209,6 +233,15 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { b.next = 0 b.addrs = sl b.addrCh <- addrs + if expiration > 0 { + expF := func() { + b.serverListExpire(seq) + } + if b.expTimer != nil { + b.expTimer.Stop() + } + b.expTimer = time.AfterFunc(expiration, expF) + } } return } @@ -226,8 +259,8 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) b.mu.Unlock() return } - b.seq++ - seq := b.seq + //b.seq++ + //seq := b.seq b.mu.Unlock() initReq := &lbpb.LoadBalanceRequest{ LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ @@ -260,6 +293,10 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) if err != nil { break } + b.mu.Lock() + b.seq++ + seq := b.seq + b.mu.Unlock() if serverList := reply.GetServerList(); serverList != nil { b.processServerList(serverList, seq) } @@ -497,6 +534,9 @@ func (b *balancer) Close() error { b.mu.Lock() defer b.mu.Unlock() b.done = true + if b.expTimer != nil { + b.expTimer.Stop() + } if b.waitCh != nil { close(b.waitCh) } diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 3215beafc33e..1d0395f32542 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -162,14 +162,16 @@ func (c *serverNameCheckCreds) OverrideServerName(s string) error { } type remoteBalancer struct { - servers *lbpb.ServerList - done chan struct{} + sls []*lbpb.ServerList + intervals []time.Duration + done chan struct{} } -func newRemoteBalancer(servers *lbpb.ServerList) *remoteBalancer { +func newRemoteBalancer(sls []*lbpb.ServerList, internals []time.Duration) *remoteBalancer { return &remoteBalancer{ - servers: servers, - done: make(chan struct{}), + sls: sls, + intervals: internals, + done: make(chan struct{}), } } @@ -186,13 +188,16 @@ func (b *remoteBalancer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) if err := stream.Send(resp); err != nil { return err } - resp = &lbpb.LoadBalanceResponse{ - LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ - ServerList: b.servers, - }, - } - if err := stream.Send(resp); err != nil { - return err + for k, v := range b.sls { + time.Sleep(b.intervals[k]) + resp = &lbpb.LoadBalanceResponse{ + LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ + ServerList: v, + }, + } + if err := stream.Send(resp); err != nil { + return err + } } <-b.done return nil @@ -268,7 +273,9 @@ func TestGRPCLB(t *testing.T) { sl := &lbpb.ServerList{ Servers: bes, } - ls := newRemoteBalancer(sl) + sls := []*lbpb.ServerList{sl} + intervals := []time.Duration{0} + ls := newRemoteBalancer(sls, intervals) lbpb.RegisterLoadBalancerServer(lb, ls) go func() { lb.Serve(lbLis) @@ -343,7 +350,9 @@ func TestDropRequest(t *testing.T) { sl := &lbpb.ServerList{ Servers: bes, } - ls := newRemoteBalancer(sl) + sls := []*lbpb.ServerList{sl} + intervals := []time.Duration{0} + ls := newRemoteBalancer(sls, intervals) lbpb.RegisterLoadBalancerServer(lb, ls) go func() { lb.Serve(lbLis) @@ -413,7 +422,9 @@ func TestDropRequestFailedNonFailFast(t *testing.T) { sl := &lbpb.ServerList{ Servers: bes, } - ls := newRemoteBalancer(sl) + sls := []*lbpb.ServerList{sl} + intervals := []time.Duration{0} + ls := newRemoteBalancer(sls, intervals) lbpb.RegisterLoadBalancerServer(lb, ls) go func() { lb.Serve(lbLis) @@ -439,3 +450,86 @@ func TestDropRequestFailedNonFailFast(t *testing.T) { } cc.Close() } + +func TestServerExpiration(t *testing.T) { + // Start a backend. + beLis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen %v", err) + } + beAddr := strings.Split(beLis.Addr().String(), ":") + bePort, err := strconv.Atoi(beAddr[1]) + backends := startBackends(t, besn, beLis) + defer stopBackends(backends) + + // Start a load balancer. + lbLis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to create the listener for the load balancer %v", err) + } + lbCreds := &serverNameCheckCreds{ + sn: lbsn, + } + lb := grpc.NewServer(grpc.Creds(lbCreds)) + if err != nil { + t.Fatalf("Failed to generate the port number %v", err) + } + be := &lbpb.Server{ + IpAddress: []byte(beAddr[0]), + Port: int32(bePort), + LoadBalanceToken: lbToken, + } + var bes []*lbpb.Server + bes = append(bes, be) + exp := &lbpb.Duration{ + Seconds: 0, + Nanos: 100000000, // 100ms + } + var sls []*lbpb.ServerList + sl := &lbpb.ServerList{ + Servers: bes, + ExpirationInterval: exp, + } + sls = append(sls, sl) + sl = &lbpb.ServerList{ + Servers: bes, + } + sls = append(sls, sl) + var intervals []time.Duration + intervals = append(intervals, 0) + intervals = append(intervals, 500*time.Millisecond) + ls := newRemoteBalancer(sls, intervals) + lbpb.RegisterLoadBalancerServer(lb, ls) + go func() { + lb.Serve(lbLis) + }() + defer func() { + ls.stop() + lb.Stop() + }() + creds := serverNameCheckCreds{ + expected: besn, + } + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(Balancer(&testNameResolver{ + addr: lbLis.Addr().String(), + })), grpc.WithBlock(), grpc.WithTransportCredentials(&creds)) + if err != nil { + t.Fatalf("Failed to dial to the backend %v", err) + } + helloC := hwpb.NewGreeterClient(cc) + if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); err != nil { + t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) + } + // Sleep and wake up when the first server list gets expired. + time.Sleep(150 * time.Millisecond) + if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}); grpc.Code(err) != codes.Unavailable { + t.Fatalf("%v.SayHello(_, _) = _, %v, want _, %s", helloC, err, codes.Unavailable) + } + // A non-failfast rpc should be succeeded after the second server list is received from + // the remote load balancer. + if _, err := helloC.SayHello(context.Background(), &hwpb.HelloRequest{Name: "grpc"}, grpc.FailFast(false)); err != nil { + t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", helloC, err) + } + cc.Close() +} From f9a8b0f5d0655df833c6e42e1eac602f52077dac Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 31 Oct 2016 14:58:59 -0700 Subject: [PATCH 2/8] minor fix --- grpclb/grpclb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 7db6ab1e8794..6e47cd00e0b3 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -190,7 +190,7 @@ func (b *balancer) serverListExpire(seq int) { } b.next = 0 b.addrs = nil - // Ask grpc internals to close the all corresponding connections. + // Ask grpc internals to close all the corresponding connections. b.addrCh <- nil } From 39c82afac6638676182f7b656d48f9c86cd35dcf Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 31 Oct 2016 15:01:34 -0700 Subject: [PATCH 3/8] remove debug info --- grpclb/grpclb.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 6e47cd00e0b3..f78a3b0b5b9b 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -259,8 +259,6 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) b.mu.Unlock() return } - //b.seq++ - //seq := b.seq b.mu.Unlock() initReq := &lbpb.LoadBalanceRequest{ LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ From 9ee8af5c41b8dbfa4f22f4848c111ceb97ded7b0 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 11 Nov 2016 11:19:51 -0800 Subject: [PATCH 4/8] fix seq issue --- grpclb/grpclb.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index f78a3b0b5b9b..3b8c668c78b5 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -246,7 +246,7 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { return } -func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) { +func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (retry bool) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false)) @@ -292,8 +292,12 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient) (retry bool) break } b.mu.Lock() + if b.done || seq < b.seq { + b.mu.Unlock() + return + } b.seq++ - seq := b.seq + seq = b.seq b.mu.Unlock() if serverList := reply.GetServerList(); serverList != nil { b.processServerList(serverList, seq) @@ -364,7 +368,10 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { go func(cc *grpc.ClientConn) { lbc := lbpb.NewLoadBalancerClient(cc) for { - if retry := b.callRemoteBalancer(lbc); !retry { + b.mu.Lock() + seq := b.seq + b.mu.Unlock() + if retry := b.callRemoteBalancer(lbc, seq); !retry { cc.Close() return } From c6542e95815674ce771c7e10b5f53206c98e2c54 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 14 Nov 2016 11:16:03 -0800 Subject: [PATCH 5/8] fix b.seq ticking --- grpclb/grpclb.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 3b8c668c78b5..0bcc1e09667d 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -296,7 +296,7 @@ func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (ret b.mu.Unlock() return } - b.seq++ + b.seq++ // tick when receiving a new list of servers. seq = b.seq b.mu.Unlock() if serverList := reply.GetServerList(); serverList != nil { @@ -365,12 +365,13 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err) return } + b.mu.Lock() + b.seq++ // tick when get a new balancer address + seq := b.seq + b.mu.Unlock() go func(cc *grpc.ClientConn) { lbc := lbpb.NewLoadBalancerClient(cc) for { - b.mu.Lock() - seq := b.seq - b.mu.Unlock() if retry := b.callRemoteBalancer(lbc, seq); !retry { cc.Close() return From 7d25886ea77bd11fcc6060ff336128b39053f713 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 14 Nov 2016 17:08:33 -0800 Subject: [PATCH 6/8] deal withthe case where the server list expiration races with the new load balancer (no addrs available yet). --- grpclb/grpclb.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 0bcc1e09667d..912084e39413 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -366,8 +366,14 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { return } b.mu.Lock() - b.seq++ // tick when get a new balancer address + b.seq++ // tick when getting a new balancer address seq := b.seq + b.next = 0 + if b.addrs != nil { + b.addrs = nil + // Ask grpc internals to close all the connections. + b.addrCh <- nil + } b.mu.Unlock() go func(cc *grpc.ClientConn) { lbc := lbpb.NewLoadBalancerClient(cc) From e6fa063bfaded7068149be6e743ff6f17d1236ca Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 14 Nov 2016 17:30:10 -0800 Subject: [PATCH 7/8] add TODO --- grpclb/grpclb.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 912084e39413..1cb44cf3c0fa 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -185,6 +185,9 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo func (b *balancer) serverListExpire(seq int) { b.mu.Lock() defer b.mu.Unlock() + // TODO: gRPC interanls do not clear the connections when the server list is stale. + // This means RPCs will keep using the existing server list until b receives new + // server list even though the list is expired. Revisit this behavior later. if b.done || seq < b.seq { return } @@ -369,11 +372,6 @@ func (b *balancer) Start(target string, config grpc.BalancerConfig) error { b.seq++ // tick when getting a new balancer address seq := b.seq b.next = 0 - if b.addrs != nil { - b.addrs = nil - // Ask grpc internals to close all the connections. - b.addrCh <- nil - } b.mu.Unlock() go func(cc *grpc.ClientConn) { lbc := lbpb.NewLoadBalancerClient(cc) From ba7dfa2dc0f2acab00ad5d411da2d593b2e94025 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 16 Nov 2016 11:25:14 -0800 Subject: [PATCH 8/8] refactor a bit --- grpclb/grpclb.go | 12 ++++++------ grpclb/grpclb_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/grpclb/grpclb.go b/grpclb/grpclb.go index 1cb44cf3c0fa..d9a1a8b6fb53 100644 --- a/grpclb/grpclb.go +++ b/grpclb/grpclb.go @@ -236,14 +236,14 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { b.next = 0 b.addrs = sl b.addrCh <- addrs + if b.expTimer != nil { + b.expTimer.Stop() + b.expTimer = nil + } if expiration > 0 { - expF := func() { + b.expTimer = time.AfterFunc(expiration, func() { b.serverListExpire(seq) - } - if b.expTimer != nil { - b.expTimer.Stop() - } - b.expTimer = time.AfterFunc(expiration, expF) + }) } } return diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 1d0395f32542..f034b6ba952a 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -167,10 +167,10 @@ type remoteBalancer struct { done chan struct{} } -func newRemoteBalancer(sls []*lbpb.ServerList, internals []time.Duration) *remoteBalancer { +func newRemoteBalancer(sls []*lbpb.ServerList, intervals []time.Duration) *remoteBalancer { return &remoteBalancer{ sls: sls, - intervals: internals, + intervals: intervals, done: make(chan struct{}), } }