Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: deflake TestADS_ResourcesAreRequestedAfterStreamRestart #7720

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions xds/internal/xdsclient/tests/ads_stream_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,16 @@ func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) {
func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error {
t.Helper()

for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
var lastRequestedNames []string
for ; ; <-time.After(defaultTestShortTimeout) {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames)
case gotNames := <-namesCh:
if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
return nil
}
t.Logf("Received resource names %v, want %v", gotNames, wantNames)
lastRequestedNames = gotNames
}
}
return fmt.Errorf("timeout waiting for resource to be requested from the management server")
}
39 changes: 36 additions & 3 deletions xds/internal/xdsclient/tests/ads_stream_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,24 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
Listener: lis,
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl())

// Drain the resource name channels before writing to them to ensure
// that the most recently requested names are made available to the
// test.
switch req.GetTypeUrl() {
case version.V3ClusterURL:
select {
case cdsResourcesCh <- req.GetResourceNames():
case <-cdsResourcesCh:
default:
}
cdsResourcesCh <- req.GetResourceNames()
case version.V3ListenerURL:
t.Logf("Received LDS request for resources: %v", req.GetResourceNames())
select {
case ldsResourcesCh <- req.GetResourceNames():
case <-ldsResourcesCh:
default:
}
ldsResourcesCh <- req.GetResourceNames()
}
return nil
},
Expand Down Expand Up @@ -130,6 +136,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
t.Fatal(err)
}

// Verify the update received by the watcher.
wantListenerUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
t.Fatal(err)
}

// Cancel the watch for the above listener resource, and verify that an LDS
// request with no resource names is sent.
ldsCancel()
Expand Down Expand Up @@ -171,6 +188,11 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
}
defer ldsCancel()

// Verify the update received by the watcher.
if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil {
t.Fatal(err)
}

// Create a cluster resource on the management server, in addition to the
// existing listener resource.
const clusterName = "cluster"
Expand All @@ -192,6 +214,17 @@ func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) {
t.Fatal(err)
}

// Verify the update received by the watcher.
wantClusterUpdate := clusterUpdateErrTuple{
update: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: clusterName,
},
}
if err := verifyClusterUpdate(ctx, cw.updateCh, wantClusterUpdate); err != nil {
t.Fatal(err)
}

// Cancel the watch for the above cluster resource, and verify that a CDS
// request with no resource names is sent.
cdsCancel()
Expand Down