Skip to content

Commit

Permalink
Review comments #1.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Aug 27, 2020
1 parent 958dd7c commit 6fc6be3
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 114 deletions.
2 changes: 1 addition & 1 deletion balancer/rls/internal/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s) TestUpdateControlChannelTimeout(t *testing.T) {
lbCfg = &lbConfig{lookupService: server.Address, lookupServiceTimeout: 2 * time.Second}
t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg)
rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg})
if _, err := lis.connCh.Receive(ctx); err != testutils.ErrRecvTimeout {
if _, err := lis.connCh.Receive(ctx); err != context.DeadlineExceeded {
t.Fatal("LB policy created new control channel when only lookupServiceTimeout changed")
}

Expand Down
4 changes: 2 additions & 2 deletions balancer/rls/internal/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestPick_DataCacheMiss_PendingCacheHit(t *testing.T) {
// Make sure that no RLS request was sent out.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := rlsCh.Receive(ctx); err != testutils.ErrRecvTimeout {
if _, err := rlsCh.Receive(ctx); err != context.DeadlineExceeded {
t.Fatalf("RLS request sent out when pending entry exists")
}
})
Expand Down Expand Up @@ -598,7 +598,7 @@ func TestPick_DataCacheHit_PendingCacheHit(t *testing.T) {
// Make sure that no RLS request was sent out.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := rlsCh.Receive(ctx); err != testutils.ErrRecvTimeout {
if _, err := rlsCh.Receive(ctx); err != context.DeadlineExceeded {
t.Fatalf("RLS request sent out when pending entry exists")
}
if test.wantErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion credentials/sts/sts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (s) TestGetRequestMetadataBadSubjectTokenRead(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

if _, err := fc.reqCh.Receive(ctx); err != testutils.ErrRecvTimeout {
if _, err := fc.reqCh.Receive(ctx); err != context.DeadlineExceeded {
errCh <- err
return
}
Expand Down
24 changes: 11 additions & 13 deletions internal/testutils/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@ package testutils

import (
"context"
"errors"
)

// ErrRecvTimeout is an error to indicate that a receive operation on the
// channel timed out.
var ErrRecvTimeout = errors.New("timed out when waiting for value on channel")

// DefaultChanBufferSize is the default buffer size of the underlying channel.
const DefaultChanBufferSize = 1

Expand All @@ -39,12 +34,12 @@ func (cwt *Channel) Send(value interface{}) {
cwt.ch <- value
}

// Receive returns the value received on the underlying channel, or
// ErrRecvTimeout if the context expires.
// Receive returns the value received on the underlying channel, or the error
// returned by ctx if it is closed or cancelled.
func (cwt *Channel) Receive(ctx context.Context) (interface{}, error) {
select {
case <-ctx.Done():
return nil, ErrRecvTimeout
return nil, ctx.Err()
case got := <-cwt.ch:
return got, nil
}
Expand All @@ -53,13 +48,16 @@ func (cwt *Channel) Receive(ctx context.Context) (interface{}, error) {
// Replace clears the value on the underlying channel, and sends the new value.
//
// It's expected to be used with a size-1 channel, to only keep the most
// up-to-date item.
// up-to-date item. This method is inherently racy when invoked concurrently
// from multiple goroutines.
func (cwt *Channel) Replace(value interface{}) {
select {
case <-cwt.ch:
default:
for {
select {
case cwt.ch <- value:
return
case <-cwt.ch:
}
}
cwt.ch <- value
}

// NewChannel returns a new Channel.
Expand Down
45 changes: 33 additions & 12 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
gotCluster, err := xdsC.WaitForWatchCluster()

ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
gotCluster, err := xdsC.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
Expand Down Expand Up @@ -326,7 +329,9 @@ func (s) TestUpdateClientConnState(t *testing.T) {
// When we wanted an error and got it, we should return early.
return
}
gotCluster, err := xdsC.WaitForWatchCluster()
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
gotCluster, err := xdsC.WaitForWatchCluster(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
Expand Down Expand Up @@ -362,7 +367,9 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
if _, err := xdsC.WaitForWatchCluster(); err != testutils.ErrRecvTimeout {
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if _, err := xdsC.WaitForWatchCluster(ctx); err != context.DeadlineExceeded {
t.Fatalf("waiting for WatchCluster() should have timed out, but returned error: %v", err)
}
}
Expand Down Expand Up @@ -420,13 +427,15 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// And this is not a resource not found error, watch shouldn't be canceled.
err1 := errors.New("cdsBalancer resolver error 1")
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, err1)
if err := xdsC.WaitForCancelClusterWatch(); err == nil {
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsB.waitForResolverError(err1); err == nil {
t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)")
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
state, err := tcc.newPickerCh.Receive(ctx)
if err != nil {
Expand All @@ -447,7 +456,9 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// is not a resource not found error, watch shouldn't be canceled
err2 := errors.New("cdsBalancer resolver error 2")
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, err2)
if err := xdsC.WaitForCancelClusterWatch(); err == nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsB.waitForResolverError(err2); err != nil {
Expand All @@ -458,7 +469,9 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// means CDS resource is removed, and eds should receive the error.
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
xdsC.InvokeWatchClusterCallback(xdsclient.ClusterUpdate{}, resourceErr)
if err := xdsC.WaitForCancelClusterWatch(); err == nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil {
t.Fatalf("want watch to be not canceled, watchForCancel should timeout")
}
if err := edsB.waitForResolverError(resourceErr); err != nil {
Expand All @@ -479,13 +492,15 @@ func (s) TestResolverError(t *testing.T) {
// Not a resource not found error, watch shouldn't be canceled.
err1 := errors.New("cdsBalancer resolver error 1")
cdsB.ResolverError(err1)
if err := xdsC.WaitForCancelClusterWatch(); err == nil {
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsB.waitForResolverError(err1); err == nil {
t.Fatal("eds balancer shouldn't get error (shouldn't be built yet)")
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
state, err := tcc.newPickerCh.Receive(ctx)
if err != nil {
Expand All @@ -506,7 +521,9 @@ func (s) TestResolverError(t *testing.T) {
// should receive the error.
err2 := errors.New("cdsBalancer resolver error 2")
cdsB.ResolverError(err2)
if err := xdsC.WaitForCancelClusterWatch(); err == nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsB.waitForResolverError(err2); err != nil {
Expand All @@ -517,7 +534,9 @@ func (s) TestResolverError(t *testing.T) {
// receive the error.
resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "cdsBalancer resource not found error")
cdsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelClusterWatch(); err != nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, watchForCancel failed: %v", err)
}
if err := edsB.waitForResolverError(resourceErr); err != nil {
Expand Down Expand Up @@ -561,7 +580,9 @@ func (s) TestClose(t *testing.T) {
}

cdsB.Close()
if err := xdsC.WaitForCancelClusterWatch(); err != nil {
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelClusterWatch(ctx); err != nil {
t.Fatal(err)
}
if err := edsB.waitForClose(); err != nil {
Expand Down
32 changes: 23 additions & 9 deletions xds/internal/balancer/edsbalancer/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func waitForNewXDSClientWithEDSWatch(t *testing.T, ch *testutils.Channel, wantNa
t.Fatalf("xdsClient created to balancer: %v, want %v", xdsC.Name(), wantName)
return nil
}
_, err = xdsC.WaitForWatchEDS()
_, err = xdsC.WaitForWatchEDS(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
return nil
Expand Down Expand Up @@ -457,12 +457,14 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
}
defer edsB.Close()

edsB.UpdateClientConnState(balancer.ClientConnState{
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
BalancerName: testBalancerNameFooBar,
EDSServiceName: testEDSClusterName,
},
})
}); err != nil {
t.Fatal(err)
}

xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
Expand All @@ -473,7 +475,10 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {

connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, connectionErr)
if err := xdsC.WaitForCancelEDSWatch(); err == nil {

ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
Expand All @@ -485,7 +490,9 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
// Even if error is resource not found, watch shouldn't be canceled, because
// this is an EDS resource removed (and xds client actually never sends this
// error, but we still handles it).
if err := xdsC.WaitForCancelEDSWatch(); err == nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
Expand Down Expand Up @@ -514,12 +521,14 @@ func (s) TestErrorFromResolver(t *testing.T) {
}
defer edsB.Close()

edsB.UpdateClientConnState(balancer.ClientConnState{
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
BalancerName: testBalancerNameFooBar,
EDSServiceName: testEDSClusterName,
},
})
}); err != nil {
t.Fatal(err)
}

xdsC := waitForNewXDSClientWithEDSWatch(t, xdsClientCh, testBalancerNameFooBar)
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, nil)
Expand All @@ -530,7 +539,10 @@ func (s) TestErrorFromResolver(t *testing.T) {

connectionErr := xdsclient.NewErrorf(xdsclient.ErrorTypeConnection, "connection error")
edsB.ResolverError(connectionErr)
if err := xdsC.WaitForCancelEDSWatch(); err == nil {

ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err == nil {
t.Fatal("watch was canceled, want not canceled (timeout error)")
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err == nil {
Expand All @@ -539,7 +551,9 @@ func (s) TestErrorFromResolver(t *testing.T) {

resourceErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "edsBalancer resource not found error")
edsB.ResolverError(resourceErr)
if err := xdsC.WaitForCancelEDSWatch(); err != nil {
ctx, ctxCancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := xdsC.WaitForCancelEDSWatch(ctx); err != nil {
t.Fatalf("want watch to be canceled, waitForCancel failed: %v", err)
}
if err := edsLB.waitForEDSResponse(xdsclient.EndpointsUpdate{}); err != nil {
Expand Down
13 changes: 8 additions & 5 deletions xds/internal/balancer/edsbalancer/xds_client_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) {

xdsC := fakeclient.NewClient()
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC))
gotCluster, err := xdsC.WaitForWatchEDS()
gotCluster, err := xdsC.WaitForWatchEDS(context.Background())
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
Expand Down Expand Up @@ -207,10 +207,13 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
defer cw.close()

// Verify that the eds watch is registered for the expected resource name.
xdsC1 := fakeclient.NewClient()
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC1))
gotCluster, err := xdsC1.WaitForWatchEDS()

// Verify that the eds watch is registered for the expected resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotCluster, err := xdsC1.WaitForWatchEDS(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
Expand All @@ -224,15 +227,15 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
// close client that are passed through attributes).
xdsC2 := fakeclient.NewClient()
cw.handleUpdate(&EDSConfig{EDSServiceName: testEDSClusterName}, attributes.New(xdsinternal.XDSClientID, xdsC2))
gotCluster, err = xdsC2.WaitForWatchEDS()
gotCluster, err = xdsC2.WaitForWatchEDS(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}

if err := xdsC1.WaitForClose(); err != testutils.ErrRecvTimeout {
if err := xdsC1.WaitForClose(ctx); err != context.DeadlineExceeded {
t.Fatalf("clientWrapper closed xdsClient received in attributes")
}
}
5 changes: 3 additions & 2 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package edsbalancer

import (
context2 "context"
"testing"

"google.golang.org/grpc/attributes"
Expand Down Expand Up @@ -46,15 +47,15 @@ func (s) TestXDSLoadReporting(t *testing.T) {
BalancerConfig: &EDSConfig{LrsLoadReportingServerName: new(string)},
})

gotCluster, err := xdsC.WaitForWatchEDS()
gotCluster, err := xdsC.WaitForWatchEDS(context2.Background())
if err != nil {
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}

got, err := xdsC.WaitForReportLoad()
got, err := xdsC.WaitForReportLoad(context2.Background())
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
Expand Down
Loading

0 comments on commit 6fc6be3

Please sign in to comment.