Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: fix for delta xDS reconnect bug in LDS/CDS #12174

Merged
merged 5 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12174.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: fix for delta xDS reconnect bug in LDS/CDS
```
24 changes: 17 additions & 7 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}

if handler, ok := handlers[req.TypeUrl]; ok {
if handler.Recv(req) {
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
}
}

if node == nil && req.Node != nil {
node = req.Node
var err error
Expand All @@ -180,6 +174,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}
}

if handler, ok := handlers[req.TypeUrl]; ok {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to move this to after the version-sniff so we can use the detected version to change behavior.

if handler.Recv(req, generator.ProxyFeatures) {
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
}
}

case cfgSnap = <-stateCh:
newRes, err := generator.allResourcesFromSnapshot(cfgSnap)
if err != nil {
Expand Down Expand Up @@ -434,7 +434,7 @@ func newDeltaType(
// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool {
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool {
if t == nil {
return false // not something we care about
}
Expand All @@ -447,6 +447,16 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
t.wildcard = len(req.ResourceNamesSubscribe) == 0
t.registered = true
registeredThisTime = true

if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect {
switch t.typeURL {
case ListenerType, ClusterType:
if !t.wildcard {
t.wildcard = true
logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type")
}
}
}
}

/*
Expand Down
66 changes: 66 additions & 0 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,72 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
}
}

func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) {
// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833

aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
_, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
envoy.EnvoyVersion = "1.18.0"

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
runStep(t, "get into state after consul restarted", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")

// Send initial cluster discover.
// This is to simulate the discovery request call from envoy after disconnected from consul ads stream.
//
// We need to force it to be an older version of envoy so that the logic shifts.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"local_app",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
InitialResourceVersions: map[string]string{
"local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03",
},
})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot
// the config contains 3 clusters: local_app, db, geo-cache.
// this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service
// during the time xds disconnected (consul restarted).
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down
33 changes: 16 additions & 17 deletions agent/xds/envoy_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ var (
// the zero'th point release of the last element of proxysupport.EnvoyVersions.
minSupportedVersion = version.Must(version.NewVersion("1.17.0"))

// add min version constraints for associated feature flags when necessary, for example:
// minVersionAllowingEmptyGatewayClustersWithIncrementalXDS = version.Must(version.NewVersion("1.16.0"))
minVersionToForceLDSandCDSToAlwaysUseWildcardsOnReconnect = version.Must(version.NewVersion("1.19.0"))

specificUnsupportedVersions = []unsupportedVersion{}
)
Expand All @@ -26,16 +25,19 @@ type unsupportedVersion struct {
}

type supportedProxyFeatures struct {
// add version dependent feature flags here
// Older versions of Envoy incorrectly exploded a wildcard subscription for
// LDS and CDS into specific line items on incremental xDS reconnect. They
// would populate both InitialResourceVersions and ResourceNamesSubscribe
// when they SHOULD have left ResourceNamesSubscribe empty (or used an
// explicit "*" in later Envoy versions) to imply wildcard mode. On
// reconnect, Consul interpreted the lack of the wildcard attribute as
// implying that the Envoy instance should not receive updates for any
// newly created listeners and clusters for the remaining life of that
// Envoy sidecar process.
//
// For example, we previously had flags for Envoy < 1.16 called:
//
// GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS
// IncrementalXDSUpdatesMustBeSerial
//
// Which then manifested in the code for checks with this struct populated.
// By dropping support for 1.15, we no longer have any special flags here
// but leaving this flagging functionality for future one-offs.
// see: https://github.com/envoyproxy/envoy/issues/16063
// see: https://github.com/envoyproxy/envoy/pull/16153
ForceLDSandCDSToAlwaysUseWildcardsOnReconnect bool
}

func determineSupportedProxyFeatures(node *envoy_core_v3.Node) (supportedProxyFeatures, error) {
Expand Down Expand Up @@ -73,12 +75,9 @@ func determineSupportedProxyFeaturesFromVersion(version *version.Version) (suppo

sf := supportedProxyFeatures{}

// add version constraints to populate feature flags here when necessary, for example:
/*
if version.LessThan(minVersionAllowingEmptyGatewayClustersWithIncrementalXDS) {
sf.GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS = true
}
*/
if version.LessThan(minVersionToForceLDSandCDSToAlwaysUseWildcardsOnReconnect) {
sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect = true
}

return sf, nil
}
Expand Down
6 changes: 6 additions & 0 deletions agent/xds/envoy_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) {
for _, v := range []string{
"1.17.0", "1.17.1", "1.17.2", "1.17.3", "1.17.4",
"1.18.0", "1.18.1", "1.18.2", "1.18.3", "1.18.4",
} {
cases[v] = testcase{expect: supportedProxyFeatures{
ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true,
}}
}
for _, v := range []string{
"1.19.0", "1.19.1",
"1.20.0", "1.20.1",
} {
Expand Down
11 changes: 9 additions & 2 deletions agent/xds/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type TestEnvoy struct {
proxyID string
token string

EnvoyVersion string

deltaStream *TestADSDeltaStream // Incremental v3
}

Expand Down Expand Up @@ -182,9 +184,14 @@ func (e *TestEnvoy) sendDeltaReq(
e.mu.Lock()
defer e.mu.Unlock()

ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0])
stringVersion := e.EnvoyVersion
if stringVersion == "" {
stringVersion = proxysupport.EnvoyVersions[0]
}

ev, valid := stringToEnvoyVersion(stringVersion)
if !valid {
t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0])
t.Fatal("envoy version is not valid: %s", stringVersion)
}

if req == nil {
Expand Down