From 60843b10662720f0b1c8b5f64856bafe21656b29 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 25 Feb 2021 14:04:15 -0800 Subject: [PATCH] xds: add support for HTTP filters (gRFC A39) (#4206) --- internal/resolver/config_selector.go | 32 ++ internal/transport/http2_client.go | 4 + internal/transport/transport.go | 3 + stream.go | 14 +- xds/internal/client/client.go | 42 ++- xds/internal/client/lds_test.go | 317 +++++++++++++++++-- xds/internal/client/rds_test.go | 203 ++++++++++-- xds/internal/client/v2/rds_test.go | 4 +- xds/internal/client/watchers_route_test.go | 10 +- xds/internal/client/xds.go | 160 ++++++++-- xds/internal/env/env.go | 4 + xds/internal/httpfilter/httpfilter.go | 102 ++++++ xds/internal/httpfilter/router/router.go | 95 ++++++ xds/internal/resolver/serviceconfig.go | 111 ++++++- xds/internal/resolver/watch_service.go | 23 +- xds/internal/resolver/watch_service_test.go | 52 ++-- xds/internal/resolver/xds_resolver.go | 5 + xds/internal/resolver/xds_resolver_test.go | 326 ++++++++++++++++++-- 18 files changed, 1342 insertions(+), 165 deletions(-) create mode 100644 xds/internal/httpfilter/httpfilter.go create mode 100644 xds/internal/httpfilter/router/router.go diff --git a/internal/resolver/config_selector.go b/internal/resolver/config_selector.go index e69900400564..1b20c7592e33 100644 --- a/internal/resolver/config_selector.go +++ b/internal/resolver/config_selector.go @@ -51,6 +51,38 @@ type RPCConfig struct { Context context.Context MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC OnCommitted func() // Called when the RPC has been committed (retries no longer possible) + Interceptor ClientInterceptor +} + +// ClientStream will ultimately be a superset of grpc.ClientStream as +// operations become necessary to support. +type ClientStream interface { + // Done is invoked when the RPC is finished using its connection, or could + // not be assigned a connection. RPC operations may still occur on + // ClientStream after done is called, since the interceptor is invoked by + // application-layer operations. + Done() +} + +// NOPClientStream is a ClientStream that does nothing +type NOPClientStream struct{} + +// Done is a nop. +func (NOPClientStream) Done() {} + +var _ ClientStream = NOPClientStream{} + +// ClientInterceptor is an interceptor for gRPC client streams. +type ClientInterceptor interface { + // NewStream can intercept ClientStream calls. The provided ClientStream + // should not be used during NewStream. RPCInfo.Context should not be used + // (will be nil). + NewStream(context.Context, RPCInfo, ClientStream) (context.Context, ClientStream, error) +} + +// ServerInterceptor is unimplementable; do not use. +type ServerInterceptor interface { + notDefined() } type csKeyType string diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 8902b7f90d9d..d5bbe720db54 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -414,6 +414,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { buf: newRecvBuffer(), headerChan: make(chan struct{}), contentSubtype: callHdr.ContentSubtype, + doneFunc: callHdr.DoneFunc, } s.wq = newWriteQuota(defaultWriteQuota, s.done) s.requestRead = func(n int) { @@ -832,6 +833,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) // This will unblock write. close(s.done) + if s.doneFunc != nil { + s.doneFunc() + } } // Close kicks off the shutdown process of the transport. This should be called diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 9c8f79cb4b29..5cf7c5f80fe1 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -241,6 +241,7 @@ type Stream struct { ctx context.Context // the associated context of the stream cancel context.CancelFunc // always nil for client side Stream done chan struct{} // closed at the end of stream to unblock writers. On the client side. + doneFunc func() // invoked at the end of stream on client side. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) method string // the associated RPC method of the stream recvCompress string @@ -611,6 +612,8 @@ type CallHdr struct { ContentSubtype string PreviousAttempts int // value of grpc-previous-rpc-attempts header to set + + DoneFunc func() // called when the stream is finished } // ClientTransport is the common interface for all gRPC client-side transport diff --git a/stream.go b/stream.go index eda1248d60ce..ebc74cd030fb 100644 --- a/stream.go +++ b/stream.go @@ -175,16 +175,27 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var mc serviceconfig.MethodConfig var onCommit func() - rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method}) + rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} + rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) if err != nil { return nil, status.Convert(err).Err() } + var doneFunc func() if rpcConfig != nil { if rpcConfig.Context != nil { ctx = rpcConfig.Context } mc = rpcConfig.MethodConfig onCommit = rpcConfig.OnCommitted + if rpcConfig.Interceptor != nil { + rpcInfo.Context = nil + newCtx, cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, iresolver.NOPClientStream{}) + if err != nil { + return nil, status.Convert(err).Err() + } + ctx = newCtx + doneFunc = cs.Done + } } if mc.WaitForReady != nil { @@ -223,6 +234,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth Host: cc.authority, Method: method, ContentSubtype: c.contentSubtype, + DoneFunc: doneFunc, } // Set our outgoing compression according to the UseCompressor CallOption, if diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 7c7ebf3e4cd2..21881cc6eae6 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -16,8 +16,8 @@ * */ -// Package client implementation a full fledged gRPC client for the xDS API -// used by the xds resolver and balancer implementations. +// Package client implements a full fledged gRPC client for the xDS API used by +// the xds resolver and balancer implementations. package client import ( @@ -33,6 +33,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/grpc/xds/internal/client/load" + "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc" "google.golang.org/grpc/internal/backoff" @@ -199,11 +200,27 @@ type ListenerUpdate struct { // common_http_protocol_options.max_stream_duration field, or zero if // unset. MaxStreamDuration time.Duration + // HTTPFilters is a list of HTTP filters (name, config) from the LDS + // response. + HTTPFilters []HTTPFilter // Raw is the resource from the xds response. Raw *anypb.Any } +// HTTPFilter represents one HTTP filter from an LDS response's HTTP connection +// manager field. +type HTTPFilter struct { + // Name is an arbitrary name of the filter. Used for applying override + // settings in virtual host / route / weighted cluster configuration (not + // yet supported). + Name string + // Filter is the HTTP filter found in the registry for the config type. + Filter httpfilter.Filter + // Config contains the filter's configuration + Config httpfilter.FilterConfig +} + func (lu *ListenerUpdate) String() string { return fmt.Sprintf("{RouteConfigName: %q, SecurityConfig: %+v", lu.RouteConfigName, lu.SecurityCfg) } @@ -226,6 +243,11 @@ type VirtualHost struct { // Routes contains a list of routes, each containing matchers and // corresponding action. Routes []*Route + // HTTPFilterConfigOverride contains any HTTP filter config overrides for + // the virtual host which may be present. An individual filter's override + // may be unused if the matching Route contains an override for that + // filter. + HTTPFilterConfigOverride map[string]httpfilter.FilterConfig } // Route is both a specification of how to match a request as well as an @@ -239,13 +261,27 @@ type Route struct { Fraction *uint32 // If the matchers above indicate a match, the below configuration is used. - Action map[string]uint32 // action is weighted clusters. + WeightedClusters map[string]WeightedCluster // If MaxStreamDuration is nil, it indicates neither of the route action's // max_stream_duration fields (grpc_timeout_header_max nor // max_stream_duration) were set. In this case, the ListenerUpdate's // MaxStreamDuration field should be used. If MaxStreamDuration is set to // an explicit zero duration, the application's deadline should be used. MaxStreamDuration *time.Duration + // HTTPFilterConfigOverride contains any HTTP filter config overrides for + // the route which may be present. An individual filter's override may be + // unused if the matching WeightedCluster contains an override for that + // filter. + HTTPFilterConfigOverride map[string]httpfilter.FilterConfig +} + +// WeightedCluster contains settings for an xds RouteAction.WeightedCluster. +type WeightedCluster struct { + // Weight is the relative weight of the cluster. It will never be zero. + Weight uint32 + // HTTPFilterConfigOverride contains any HTTP filter config overrides for + // the weighted cluster which may be present. + HTTPFilterConfigOverride map[string]httpfilter.FilterConfig } // HeaderMatcher represents header matchers. diff --git a/xds/internal/client/lds_test.go b/xds/internal/client/lds_test.go index 2a31b7e2e508..bdce958f30ca 100644 --- a/xds/internal/client/lds_test.go +++ b/xds/internal/client/lds_test.go @@ -19,10 +19,22 @@ package client import ( + "fmt" "strings" "testing" "time" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/httpfilter" + "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/durationpb" + + v1typepb "github.com/cncf/udpa/go/udpa/type/v1" v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -31,14 +43,9 @@ import ( v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" anypb "github.com/golang/protobuf/ptypes/any" + spb "github.com/golang/protobuf/ptypes/struct" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "google.golang.org/grpc/xds/internal/version" - "google.golang.org/protobuf/types/known/durationpb" ) func (s) TestUnmarshalListener_ClientSide(t *testing.T) { @@ -77,32 +84,59 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { return mLis }(), } - v3Lis = &anypb.Any{ - TypeUrl: version.V3ListenerURL, - Value: func() []byte { - cm := &v3httppb.HttpConnectionManager{ - RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ - Rds: &v3httppb.Rds{ - ConfigSource: &v3corepb.ConfigSource{ - ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, - }, - RouteConfigName: v3RouteConfigName, + customFilter = &v3httppb.HttpFilter{ + Name: "customFilter", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: customFilterConfig}, + } + typedStructFilter = &v3httppb.HttpFilter{ + Name: "customFilter", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: wrappedCustomFilterTypedStructConfig}, + } + customFilter2 = &v3httppb.HttpFilter{ + Name: "customFilter2", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: customFilterConfig}, + } + errFilter = &v3httppb.HttpFilter{ + Name: "errFilter", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: errFilterConfig}, + } + clientOnlyCustomFilter = &v3httppb.HttpFilter{ + Name: "clientOnlyCustomFilter", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: clientOnlyCustomFilterConfig}, + } + serverOnlyCustomFilter = &v3httppb.HttpFilter{ + Name: "serverOnlyCustomFilter", + ConfigType: &v3httppb.HttpFilter_TypedConfig{TypedConfig: serverOnlyCustomFilterConfig}, + } + v3LisWithFilters = func(fs ...*v3httppb.HttpFilter) *anypb.Any { + hcm := &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{ + Rds: &v3httppb.Rds{ + ConfigSource: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, + RouteConfigName: v3RouteConfigName, }, - CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ - MaxStreamDuration: durationpb.New(time.Second), - }, - } - mcm, _ := ptypes.MarshalAny(cm) - lis := &v3listenerpb.Listener{ - Name: v3LDSTarget, - ApiListener: &v3listenerpb.ApiListener{ - ApiListener: mcm, - }, - } - mLis, _ := proto.Marshal(lis) - return mLis - }(), + }, + CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ + MaxStreamDuration: durationpb.New(time.Second), + }, + HttpFilters: fs, + } + return &anypb.Any{ + TypeUrl: version.V3ListenerURL, + Value: func() []byte { + mcm, _ := ptypes.MarshalAny(hcm) + lis := &v3listenerpb.Listener{ + Name: v3LDSTarget, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: mcm, + }, + } + mLis, _ := proto.Marshal(lis) + return mLis + }(), + } } ) @@ -111,6 +145,7 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { resources []*anypb.Any wantUpdate map[string]ListenerUpdate wantErr bool + disableFI bool // disable fault injection }{ { name: "non-listener resource", @@ -272,6 +307,104 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { { name: "empty resource list", }, + { + name: "v3 with no filters", + resources: []*anypb.Any{v3LisWithFilters()}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, + }, + }, + { + name: "v3 with custom filter", + resources: []*anypb.Any{v3LisWithFilters(customFilter)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: { + RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, + HTTPFilters: []HTTPFilter{{ + Name: "customFilter", + Filter: httpFilter{}, + Config: filterConfig{Cfg: customFilterConfig}, + }}, + }, + }, + }, + { + name: "v3 with custom filter in typed struct", + resources: []*anypb.Any{v3LisWithFilters(typedStructFilter)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: { + RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, + HTTPFilters: []HTTPFilter{{ + Name: "customFilter", + Filter: httpFilter{}, + Config: filterConfig{Cfg: customFilterTypedStructConfig}, + }}, + }, + }, + }, + { + name: "v3 with custom filter, fault injection disabled", + resources: []*anypb.Any{v3LisWithFilters(customFilter)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, + }, + disableFI: true, + }, + { + name: "v3 with two filters with same name", + resources: []*anypb.Any{v3LisWithFilters(customFilter, customFilter)}, + wantErr: true, + }, + { + name: "v3 with two filters - same type different name", + resources: []*anypb.Any{v3LisWithFilters(customFilter, customFilter2)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: { + RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, + HTTPFilters: []HTTPFilter{{ + Name: "customFilter", + Filter: httpFilter{}, + Config: filterConfig{Cfg: customFilterConfig}, + }, { + Name: "customFilter2", + Filter: httpFilter{}, + Config: filterConfig{Cfg: customFilterConfig}, + }}, + }, + }, + }, + { + name: "v3 with server-only filter", + resources: []*anypb.Any{v3LisWithFilters(serverOnlyCustomFilter)}, + wantErr: true, + }, + { + name: "v3 with client-only filter", + resources: []*anypb.Any{v3LisWithFilters(clientOnlyCustomFilter)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: { + RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second, + HTTPFilters: []HTTPFilter{{ + Name: "clientOnlyCustomFilter", + Filter: clientOnlyHTTPFilter{}, + Config: filterConfig{Cfg: clientOnlyCustomFilterConfig}, + }}, + }, + }, + }, + { + name: "v3 with err filter", + resources: []*anypb.Any{v3LisWithFilters(errFilter)}, + wantErr: true, + }, + { + name: "v3 with error filter, fault injection disabled", + resources: []*anypb.Any{v3LisWithFilters(errFilter)}, + wantUpdate: map[string]ListenerUpdate{ + v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, + }, + disableFI: true, + }, { name: "v2 listener resource", resources: []*anypb.Any{v2Lis}, @@ -281,14 +414,14 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { }, { name: "v3 listener resource", - resources: []*anypb.Any{v3Lis}, + resources: []*anypb.Any{v3LisWithFilters()}, wantUpdate: map[string]ListenerUpdate{ v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, }, }, { name: "multiple listener resources", - resources: []*anypb.Any{v2Lis, v3Lis}, + resources: []*anypb.Any{v2Lis, v3LisWithFilters()}, wantUpdate: map[string]ListenerUpdate{ v2LDSTarget: {RouteConfigName: v2RouteConfigName}, v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second}, @@ -298,10 +431,16 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + oldFI := env.FaultInjectionSupport + env.FaultInjectionSupport = !test.disableFI + update, _, err := UnmarshalListener("", test.resources, nil) - if ((err != nil) != test.wantErr) || !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty()) { + if ((err != nil) != test.wantErr) || + !cmp.Equal(update, test.wantUpdate, cmpopts.EquateEmpty(), protocmp.Transform()) { t.Errorf("UnmarshalListener(%v) = (%v, %v) want (%v, %v)", test.resources, update, err, test.wantUpdate, test.wantErr) } + + env.FaultInjectionSupport = oldFI }) } } @@ -939,3 +1078,115 @@ func (s) TestUnmarshalListener_ServerSide(t *testing.T) { }) } } + +type filterConfig struct { + httpfilter.FilterConfig + Cfg proto.Message + Override proto.Message +} + +// httpFilter allows testing the http filter registry and parsing functionality. +type httpFilter struct { + httpfilter.ClientInterceptorBuilder + httpfilter.ServerInterceptorBuilder +} + +func (httpFilter) TypeURLs() []string { return []string{"custom.filter"} } + +func (httpFilter) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Cfg: cfg}, nil +} + +func (httpFilter) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Override: override}, nil +} + +// errHTTPFilter returns errors no matter what is passed to ParseFilterConfig. +type errHTTPFilter struct { + httpfilter.ClientInterceptorBuilder +} + +func (errHTTPFilter) TypeURLs() []string { return []string{"err.custom.filter"} } + +func (errHTTPFilter) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return nil, fmt.Errorf("error from ParseFilterConfig") +} + +func (errHTTPFilter) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return nil, fmt.Errorf("error from ParseFilterConfigOverride") +} + +func init() { + httpfilter.Register(httpFilter{}) + httpfilter.Register(errHTTPFilter{}) + httpfilter.Register(serverOnlyHTTPFilter{}) + httpfilter.Register(clientOnlyHTTPFilter{}) +} + +// serverOnlyHTTPFilter does not implement ClientInterceptorBuilder +type serverOnlyHTTPFilter struct { + httpfilter.ServerInterceptorBuilder +} + +func (serverOnlyHTTPFilter) TypeURLs() []string { return []string{"serverOnly.custom.filter"} } + +func (serverOnlyHTTPFilter) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Cfg: cfg}, nil +} + +func (serverOnlyHTTPFilter) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Override: override}, nil +} + +// clientOnlyHTTPFilter does not implement ServerInterceptorBuilder +type clientOnlyHTTPFilter struct { + httpfilter.ClientInterceptorBuilder +} + +func (clientOnlyHTTPFilter) TypeURLs() []string { return []string{"clientOnly.custom.filter"} } + +func (clientOnlyHTTPFilter) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Cfg: cfg}, nil +} + +func (clientOnlyHTTPFilter) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfig{Override: override}, nil +} + +var customFilterConfig = &anypb.Any{ + TypeUrl: "custom.filter", + Value: []byte{1, 2, 3}, +} + +var errFilterConfig = &anypb.Any{ + TypeUrl: "err.custom.filter", + Value: []byte{1, 2, 3}, +} + +var serverOnlyCustomFilterConfig = &anypb.Any{ + TypeUrl: "serverOnly.custom.filter", + Value: []byte{1, 2, 3}, +} + +var clientOnlyCustomFilterConfig = &anypb.Any{ + TypeUrl: "clientOnly.custom.filter", + Value: []byte{1, 2, 3}, +} + +var customFilterTypedStructConfig = &v1typepb.TypedStruct{ + TypeUrl: "custom.filter", + Value: &spb.Struct{ + Fields: map[string]*spb.Value{ + "foo": {Kind: &spb.Value_StringValue{StringValue: "bar"}}, + }, + }, +} +var wrappedCustomFilterTypedStructConfig *anypb.Any + +func init() { + var err error + wrappedCustomFilterTypedStructConfig, err = ptypes.MarshalAny(customFilterTypedStructConfig) + if err != nil { + panic(err.Error()) + } +} diff --git a/xds/internal/client/rds_test.go b/xds/internal/client/rds_test.go index 1a82e0304e74..1a33c90a7dbc 100644 --- a/xds/internal/client/rds_test.go +++ b/xds/internal/client/rds_test.go @@ -19,6 +19,7 @@ package client import ( + "fmt" "testing" "time" @@ -33,6 +34,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/version" "google.golang.org/protobuf/types/known/durationpb" ) @@ -46,11 +49,42 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { clusterName = "clusterName" ) + var ( + goodRouteConfigWithFilterConfigs = func(cfgs map[string]*anypb.Any) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}}, + }, + }}, + TypedPerFilterConfig: cfgs, + }}, + } + } + goodUpdateWithFilterConfigs = func(cfgs map[string]httpfilter.FilterConfig) RouteConfigUpdate { + return RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}, + }}, + HTTPFilterConfigOverride: cfgs, + }}, + } + } + ) + tests := []struct { name string rc *v3routepb.RouteConfiguration wantUpdate RouteConfigUpdate wantError bool + disableFI bool // disable fault injection }{ { name: "default-route-match-field-is-nil", @@ -141,7 +175,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), CaseInsensitive: true, Action: map[string]uint32{clusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP("/"), CaseInsensitive: true, WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}}, }, }, }, @@ -183,11 +217,11 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{clusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}}, }, }, }, @@ -217,7 +251,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP("/"), WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}}}, }, }, }, @@ -287,7 +321,14 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{"a": 2, "b": 3, "c": 5}}}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{ + "a": {Weight: 2}, + "b": {Weight: 3}, + "c": {Weight: 5}, + }, + }}, }, }, }, @@ -317,7 +358,11 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}, + MaxStreamDuration: newDurationP(time.Second), + }}, }, }, }, @@ -347,7 +392,11 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}, + MaxStreamDuration: newDurationP(time.Second), + }}, }, }, }, @@ -377,18 +426,52 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(0)}}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}, + MaxStreamDuration: newDurationP(0), + }}, }, }, }, }, + { + name: "good-route-config-with-http-filter-config", + rc: goodRouteConfigWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}), + wantUpdate: goodUpdateWithFilterConfigs(map[string]httpfilter.FilterConfig{"foo": filterConfig{Override: customFilterConfig}}), + }, + { + name: "good-route-config-with-http-filter-config-typed-struct", + rc: goodRouteConfigWithFilterConfigs(map[string]*anypb.Any{"foo": wrappedCustomFilterTypedStructConfig}), + wantUpdate: goodUpdateWithFilterConfigs(map[string]httpfilter.FilterConfig{"foo": filterConfig{Override: customFilterTypedStructConfig}}), + }, + { + name: "good-route-config-with-http-err-filter-config", + rc: goodRouteConfigWithFilterConfigs(map[string]*anypb.Any{"foo": errFilterConfig}), + wantError: true, + }, + { + name: "good-route-config-with-http-err-filter-config-fi-disabled", + disableFI: true, + rc: goodRouteConfigWithFilterConfigs(map[string]*anypb.Any{"foo": errFilterConfig}), + wantUpdate: goodUpdateWithFilterConfigs(nil), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, nil) - if (gotError != nil) != test.wantError || !cmp.Equal(gotUpdate, test.wantUpdate, cmpopts.EquateEmpty()) { + oldFI := env.FaultInjectionSupport + env.FaultInjectionSupport = !test.disableFI + + gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, nil, false) + if (gotError != nil) != test.wantError || + !cmp.Equal(gotUpdate, test.wantUpdate, cmpopts.EquateEmpty(), + cmp.Transformer("FilterConfig", func(fc httpfilter.FilterConfig) string { + return fmt.Sprint(fc) + })) { t.Errorf("generateRDSUpdateFromRouteConfiguration(%+v, %v) returned unexpected, diff (-want +got):\\n%s", test.rc, ldsTarget, cmp.Diff(test.wantUpdate, gotUpdate, cmpopts.EquateEmpty())) + + env.FaultInjectionSupport = oldFI } }) } @@ -518,11 +601,11 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{v2ClusterName: {Weight: 1}}}}, }, }, }, @@ -536,11 +619,11 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{v3ClusterName: {Weight: 1}}}}, }, }, }, @@ -554,11 +637,11 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v3ClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{v3ClusterName: {Weight: 1}}}}, }, }, }, @@ -566,11 +649,11 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{ldsTarget}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{v2ClusterName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{v2ClusterName: {Weight: 1}}}}, }, }, }, @@ -588,11 +671,44 @@ func (s) TestUnmarshalRouteConfig(t *testing.T) { } func (s) TestRoutesProtoToSlice(t *testing.T) { + var ( + goodRouteWithFilterConfigs = func(cfgs map[string]*anypb.Any) []*v3routepb.Route { + // Sets per-filter config in cluster "B" and in the route. + return []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}, + CaseSensitive: &wrapperspb.BoolValue{Value: false}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}, TypedPerFilterConfig: cfgs}, + {Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}}, + }, + TotalWeight: &wrapperspb.UInt32Value{Value: 100}, + }}}}, + TypedPerFilterConfig: cfgs, + }} + } + goodUpdateWithFilterConfigs = func(cfgs map[string]httpfilter.FilterConfig) []*Route { + // Sets per-filter config in cluster "B" and in the route. + return []*Route{{ + Prefix: newStringP("/"), + CaseInsensitive: true, + WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60, HTTPFilterConfigOverride: cfgs}}, + HTTPFilterConfigOverride: cfgs, + }} + } + ) + tests := []struct { name string routes []*v3routepb.Route wantRoutes []*Route wantErr bool + disableFI bool // disable fault injection }{ { name: "no path", @@ -620,9 +736,9 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { }}}}, }}, wantRoutes: []*Route{{ - Prefix: newStringP("/"), - CaseInsensitive: true, - Action: map[string]uint32{"A": 40, "B": 60}, + Prefix: newStringP("/"), + CaseInsensitive: true, + WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}}, }}, }, { @@ -668,8 +784,8 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { PrefixMatch: newStringP("tv"), }, }, - Fraction: newUInt32P(10000), - Action: map[string]uint32{"A": 40, "B": 60}, + Fraction: newUInt32P(10000), + WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}}, }}, wantErr: false, }, @@ -702,8 +818,8 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { // Only one route in the result, because the second one with query // parameters is ignored. wantRoutes: []*Route{{ - Prefix: newStringP("/a/"), - Action: map[string]uint32{"A": 40, "B": 60}, + Prefix: newStringP("/a/"), + WeightedClusters: map[string]WeightedCluster{"A": {Weight: 40}, "B": {Weight: 60}}, }}, wantErr: false, }, @@ -771,16 +887,49 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { }, wantErr: true, }, + { + name: "with custom HTTP filter config", + routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}), + wantRoutes: goodUpdateWithFilterConfigs(map[string]httpfilter.FilterConfig{"foo": filterConfig{Override: customFilterConfig}}), + }, + { + name: "with custom HTTP filter config in typed struct", + routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": wrappedCustomFilterTypedStructConfig}), + wantRoutes: goodUpdateWithFilterConfigs(map[string]httpfilter.FilterConfig{"foo": filterConfig{Override: customFilterTypedStructConfig}}), + }, + { + name: "with custom HTTP filter config, FI disabled", + disableFI: true, + routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": customFilterConfig}), + wantRoutes: goodUpdateWithFilterConfigs(nil), + }, + { + name: "with erroring custom HTTP filter config", + routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": errFilterConfig}), + wantErr: true, + }, + { + name: "with erroring custom HTTP filter config, FI disabled", + disableFI: true, + routes: goodRouteWithFilterConfigs(map[string]*anypb.Any{"foo": errFilterConfig}), + wantRoutes: goodUpdateWithFilterConfigs(nil), + }, } cmpOpts := []cmp.Option{ cmp.AllowUnexported(Route{}, HeaderMatcher{}, Int64Range{}), cmpopts.EquateEmpty(), + cmp.Transformer("FilterConfig", func(fc httpfilter.FilterConfig) string { + return fmt.Sprint(fc) + }), } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := routesProtoToSlice(tt.routes, nil) + oldFI := env.FaultInjectionSupport + env.FaultInjectionSupport = !tt.disableFI + + got, err := routesProtoToSlice(tt.routes, nil, false) if (err != nil) != tt.wantErr { t.Errorf("routesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr) return @@ -788,6 +937,8 @@ func (s) TestRoutesProtoToSlice(t *testing.T) { if !cmp.Equal(got, tt.wantRoutes, cmpOpts...) { t.Errorf("routesProtoToSlice() got = %v, want %v, diff: %v", got, tt.wantRoutes, cmp.Diff(got, tt.wantRoutes, cmpOpts...)) } + + env.FaultInjectionSupport = oldFI }) } } diff --git a/xds/internal/client/v2/rds_test.go b/xds/internal/client/v2/rds_test.go index 94508f9e5cf1..ed5058836ce6 100644 --- a/xds/internal/client/v2/rds_test.go +++ b/xds/internal/client/v2/rds_test.go @@ -97,11 +97,11 @@ func (s) TestRDSHandleResponseWithRouting(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{uninterestingDomain}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{uninterestingClusterName: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{uninterestingClusterName: {Weight: 1}}}}, }, { Domains: []string{goodLDSTarget1}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{goodClusterName1: {Weight: 1}}}}, }, }, }, diff --git a/xds/internal/client/watchers_route_test.go b/xds/internal/client/watchers_route_test.go index 93a47e9a3423..5f44e5493330 100644 --- a/xds/internal/client/watchers_route_test.go +++ b/xds/internal/client/watchers_route_test.go @@ -66,7 +66,7 @@ func (s) TestRDSWatch(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{testLDSName}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName: {Weight: 1}}}}, }, }, } @@ -138,7 +138,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{testLDSName}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName: {Weight: 1}}}}, }, }, } @@ -217,7 +217,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{testLDSName}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName + "1": {Weight: 1}}}}, }, }, } @@ -225,7 +225,7 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{testLDSName}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName + "2": {Weight: 1}}}}, }, }, } @@ -276,7 +276,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { VirtualHosts: []*VirtualHost{ { Domains: []string{testLDSName}, - Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}, + Routes: []*Route{{Prefix: newStringP(""), WeightedClusters: map[string]WeightedCluster{testCDSName: {Weight: 1}}}}, }, }, } diff --git a/xds/internal/client/xds.go b/xds/internal/client/xds.go index caeb42856e09..055a782d468b 100644 --- a/xds/internal/client/xds.go +++ b/xds/internal/client/xds.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" + v1typepb "github.com/cncf/udpa/go/udpa/type/v1" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -34,11 +35,13 @@ import ( v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/golang/protobuf/proto" - anypb "github.com/golang/protobuf/ptypes/any" + "github.com/golang/protobuf/ptypes" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/version" ) @@ -49,19 +52,21 @@ const transportSocketName = "envoy.transport_sockets.tls" // UnmarshalListener processes resources received in an LDS response, validates // them, and transforms them into a native struct which contains only fields we // are interested in. -func UnmarshalListener(version string, resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ListenerUpdate, UpdateMetadata, error) { +func UnmarshalListener(_ string, resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]ListenerUpdate, UpdateMetadata, error) { update := make(map[string]ListenerUpdate) for _, r := range resources { if !IsListenerResource(r.GetTypeUrl()) { return nil, UpdateMetadata{}, fmt.Errorf("xds: unexpected resource type: %q in LDS response", r.GetTypeUrl()) } + // TODO: Pass version.TransportAPI instead of relying upon the type URL + v2 := r.GetTypeUrl() == version.V2ListenerURL lis := &v3listenerpb.Listener{} if err := proto.Unmarshal(r.GetValue(), lis); err != nil { return nil, UpdateMetadata{}, fmt.Errorf("xds: failed to unmarshal resource in LDS response: %v", err) } logger.Infof("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, lis) - lu, err := processListener(lis) + lu, err := processListener(lis, v2) if err != nil { return nil, UpdateMetadata{}, err } @@ -70,16 +75,16 @@ func UnmarshalListener(version string, resources []*anypb.Any, logger *grpclog.P return update, UpdateMetadata{}, nil } -func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { +func processListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) { if lis.GetApiListener() != nil { - return processClientSideListener(lis) + return processClientSideListener(lis, v2) } return processServerSideListener(lis) } // processClientSideListener checks if the provided Listener proto meets // the expected criteria. If so, it returns a non-empty routeConfigName. -func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { +func processClientSideListener(lis *v3listenerpb.Listener, v2 bool) (*ListenerUpdate, error) { update := &ListenerUpdate{} apiLisAny := lis.GetApiListener().GetApiListener() @@ -111,11 +116,106 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err return nil, fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier) } + if v2 { + return update, nil + } + + // The following checks and fields only apply to xDS protocol versions v3+. + update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration() + var err error + if update.HTTPFilters, err = processHTTPFilters(apiLis.GetHttpFilters(), false); err != nil { + return nil, fmt.Errorf("xds: %v", err) + } + return update, nil } +func unwrapHTTPFilterConfig(config *anypb.Any) (proto.Message, string, error) { + if typeURL := config.GetTypeUrl(); typeURL != "type.googleapis.com/udpa.type.v1.TypedStruct" { + return config, typeURL, nil + } + // The real type name is inside the TypedStruct. + s := new(v1typepb.TypedStruct) + if err := ptypes.UnmarshalAny(config, s); err != nil { + return nil, "", fmt.Errorf("error unmarshalling TypedStruct filter config: %v", err) + } + return s, s.GetTypeUrl(), nil +} + +func validateHTTPFilterConfig(cfg *anypb.Any, lds bool) (httpfilter.Filter, httpfilter.FilterConfig, error) { + config, typeURL, err := unwrapHTTPFilterConfig(cfg) + if err != nil { + return nil, nil, err + } + filterBuilder := httpfilter.Get(typeURL) + if filterBuilder == nil { + return nil, nil, fmt.Errorf("no filter implementation found for %q", typeURL) + } + parseFunc := filterBuilder.ParseFilterConfig + if !lds { + parseFunc = filterBuilder.ParseFilterConfigOverride + } + filterConfig, err := parseFunc(config) + if err != nil { + return nil, nil, fmt.Errorf("error parsing config for filter %q: %v", typeURL, err) + } + return filterBuilder, filterConfig, nil +} + +func processHTTPFilterOverrides(cfgs map[string]*anypb.Any) (map[string]httpfilter.FilterConfig, error) { + if !env.FaultInjectionSupport || len(cfgs) == 0 { + return nil, nil + } + m := make(map[string]httpfilter.FilterConfig) + for name, cfg := range cfgs { + _, config, err := validateHTTPFilterConfig(cfg, false) + if err != nil { + return nil, err + } + m[name] = config + } + return m, nil +} + +func processHTTPFilters(filters []*v3httppb.HttpFilter, server bool) ([]HTTPFilter, error) { + if !env.FaultInjectionSupport { + return nil, nil + } + + ret := make([]HTTPFilter, 0, len(filters)) + seenNames := make(map[string]bool, len(filters)) + for _, filter := range filters { + name := filter.GetName() + if name == "" { + return nil, errors.New("filter missing name field") + } + if seenNames[name] { + return nil, fmt.Errorf("duplicate filter name %q", name) + } + seenNames[name] = true + + httpFilter, config, err := validateHTTPFilterConfig(filter.GetTypedConfig(), true) + if err != nil { + return nil, err + } + if server { + if _, ok := httpFilter.(httpfilter.ServerInterceptorBuilder); !ok { + return nil, fmt.Errorf("httpFilter %q not supported server-side", name) + } + } else { + if _, ok := httpFilter.(httpfilter.ClientInterceptorBuilder); !ok { + return nil, fmt.Errorf("httpFilter %q not supported client-side", name) + } + } + + // Save name/config + ret = append(ret, HTTPFilter{Name: name, Filter: httpFilter, Config: config}) + } + return ret, nil +} + func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) { // Make sure that an address encoded in the received listener resource, and // that it matches the one specified in the name. Listener names on the @@ -196,7 +296,7 @@ func getAddressFromName(name string) (host string, port string, err error) { // validates them, and transforms them into a native struct which contains only // fields we are interested in. The provided hostname determines the route // configuration resources of interest. -func UnmarshalRouteConfig(version string, resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, UpdateMetadata, error) { +func UnmarshalRouteConfig(_ string, resources []*anypb.Any, logger *grpclog.PrefixLogger) (map[string]RouteConfigUpdate, UpdateMetadata, error) { update := make(map[string]RouteConfigUpdate) for _, r := range resources { if !IsRouteConfigResource(r.GetTypeUrl()) { @@ -208,8 +308,10 @@ func UnmarshalRouteConfig(version string, resources []*anypb.Any, logger *grpclo } logger.Infof("Resource with name: %v, type: %T, contains: %v.", rc.GetName(), rc, rc) + // TODO: Pass version.TransportAPI instead of relying upon the type URL + v2 := r.GetTypeUrl() == version.V2RouteConfigURL // Use the hostname (resourceName for LDS) to find the routes. - u, err := generateRDSUpdateFromRouteConfiguration(rc, logger) + u, err := generateRDSUpdateFromRouteConfiguration(rc, logger, v2) if err != nil { return nil, UpdateMetadata{}, fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v with err: %v", rc, err) } @@ -234,22 +336,30 @@ func UnmarshalRouteConfig(version string, resources []*anypb.Any, logger *grpclo // field must be empty and whose route field must be set. Inside that route // message, the cluster field will contain the clusterName or weighted clusters // we are looking for. -func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, logger *grpclog.PrefixLogger) (RouteConfigUpdate, error) { +func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, logger *grpclog.PrefixLogger, v2 bool) (RouteConfigUpdate, error) { var vhs []*VirtualHost for _, vh := range rc.GetVirtualHosts() { - routes, err := routesProtoToSlice(vh.Routes, logger) + routes, err := routesProtoToSlice(vh.Routes, logger, v2) if err != nil { return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err) } - vhs = append(vhs, &VirtualHost{ + vhOut := &VirtualHost{ Domains: vh.GetDomains(), Routes: routes, - }) + } + if !v2 { + cfgs, err := processHTTPFilterOverrides(vh.GetTypedPerFilterConfig()) + if err != nil { + return RouteConfigUpdate{}, fmt.Errorf("virtual host %+v: %v", vh, err) + } + vhOut.HTTPFilterConfigOverride = cfgs + } + vhs = append(vhs, vhOut) } return RouteConfigUpdate{VirtualHosts: vhs}, nil } -func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) { +func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, v2 bool) ([]*Route, error) { var routesRet []*Route for _, r := range routes { @@ -325,11 +435,11 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) route.Fraction = &n } - clusters := make(map[string]uint32) + route.WeightedClusters = make(map[string]WeightedCluster) action := r.GetRoute() switch a := action.GetClusterSpecifier().(type) { case *v3routepb.RouteAction_Cluster: - clusters[a.Cluster] = 1 + route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1} case *v3routepb.RouteAction_WeightedClusters: wcs := a.WeightedClusters var totalWeight uint32 @@ -338,7 +448,15 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) if w == 0 { continue } - clusters[c.GetName()] = w + wc := WeightedCluster{Weight: w} + if !v2 { + cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig()) + if err != nil { + return nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err) + } + wc.HTTPFilterConfigOverride = cfgs + } + route.WeightedClusters[c.GetName()] = wc totalWeight += w } if totalWeight != wcs.GetTotalWeight().GetValue() { @@ -351,8 +469,6 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) continue } - route.Action = clusters - msd := action.GetMaxStreamDuration() // Prefer grpc_timeout_header_max, if set. dur := msd.GetGrpcTimeoutHeaderMax() @@ -363,6 +479,14 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger) d := dur.AsDuration() route.MaxStreamDuration = &d } + + if !v2 { + cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig()) + if err != nil { + return nil, fmt.Errorf("route %+v: %v", r, err) + } + route.HTTPFilterConfigOverride = cfgs + } routesRet = append(routesRet, &route) } return routesRet, nil diff --git a/xds/internal/env/env.go b/xds/internal/env/env.go index 3d75bc3e86f4..065ce6145638 100644 --- a/xds/internal/env/env.go +++ b/xds/internal/env/env.go @@ -40,6 +40,7 @@ const ( BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG" circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" + faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION" ) var ( @@ -63,4 +64,7 @@ var ( // route actions is enabled. This can be enabled by setting the // environment variable "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT" to "true". TimeoutSupport = strings.EqualFold(os.Getenv(timeoutSupportEnv), "true") + // FaultInjectionSupport is used to control both fault injection and HTTP + // filter support. + FaultInjectionSupport = strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "true") ) diff --git a/xds/internal/httpfilter/httpfilter.go b/xds/internal/httpfilter/httpfilter.go new file mode 100644 index 000000000000..6650241fab71 --- /dev/null +++ b/xds/internal/httpfilter/httpfilter.go @@ -0,0 +1,102 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package httpfilter contains the HTTPFilter interface and a registry for +// storing and retrieving their implementations. +package httpfilter + +import ( + "github.com/golang/protobuf/proto" + iresolver "google.golang.org/grpc/internal/resolver" +) + +// FilterConfig represents an opaque data structure holding configuration for a +// filter. Embed this interface to implement it. +type FilterConfig interface { + isFilterConfig() +} + +// Filter defines the parsing functionality of an HTTP filter. A Filter may +// optionally implement either ClientInterceptorBuilder or +// ServerInterceptorBuilder or both, indicating it is capable of working on the +// client side or server side or both, respectively. +type Filter interface { + // TypeURLs are the proto message types supported by this filter. A filter + // will be registered by each of its supported message types. + TypeURLs() []string + // ParseFilterConfig parses the provided configuration proto.Message from + // the LDS configuration of this filter. This may be an anypb.Any or a + // udpa.type.v1.TypedStruct for filters that do not accept a custom type. + // The resulting FilterConfig will later be passed to Build. + ParseFilterConfig(proto.Message) (FilterConfig, error) + // ParseFilterConfigOverride parses the provided override configuration + // proto.Message from the RDS override configuration of this filter. This + // may be an anypb.Any or a udpa.type.v1.TypedStruct for filters that do + // not accept a custom type. The resulting FilterConfig will later be + // passed to Build. + ParseFilterConfigOverride(proto.Message) (FilterConfig, error) +} + +// ClientInterceptorBuilder constructs a Client Interceptor. If this type is +// implemented by a Filter, it is capable of working on a client. +type ClientInterceptorBuilder interface { + // BuildClientInterceptor uses the FilterConfigs produced above to produce + // an HTTP filter interceptor for clients. config will always be non-nil, + // but override may be nil if no override config exists for the filter. It + // is valid for Build to return a nil Interceptor and a nil error. In this + // case, the RPC will not be intercepted by this filter. + BuildClientInterceptor(config, override FilterConfig) (iresolver.ClientInterceptor, error) +} + +// ServerInterceptorBuilder constructs a Server Interceptor. If this type is +// implemented by a Filter, it is capable of working on a server. +// +// Server side filters are not currently supported, but this interface is +// defined for clarity. +type ServerInterceptorBuilder interface { + // BuildServerInterceptor uses the FilterConfigs produced above to produce + // an HTTP filter interceptor for servers. config will always be non-nil, + // but override may be nil if no override config exists for the filter. It + // is valid for Build to return a nil Interceptor and a nil error. In this + // case, the RPC will not be intercepted by this filter. + BuildServerInterceptor(config, override FilterConfig) (iresolver.ServerInterceptor, error) +} + +var ( + // m is a map from scheme to filter. + m = make(map[string]Filter) +) + +// Register registers the HTTP filter Builder to the filter map. b.TypeURLs() +// will be used as the types for this filter. +// +// NOTE: this function must only be called during initialization time (i.e. in +// an init() function), and is not thread-safe. If multiple filters are +// registered with the same type URL, the one registered last will take effect. +func Register(b Filter) { + for _, u := range b.TypeURLs() { + m[u] = b + } +} + +// Get returns the HTTPFilter registered with typeURL. +// +// If no filter is register with typeURL, nil will be returned. +func Get(typeURL string) Filter { + return m[typeURL] +} diff --git a/xds/internal/httpfilter/router/router.go b/xds/internal/httpfilter/router/router.go new file mode 100644 index 000000000000..26e3acb5a4f4 --- /dev/null +++ b/xds/internal/httpfilter/router/router.go @@ -0,0 +1,95 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package router implements the Envoy Router HTTP filter. +package router + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/xds/internal/httpfilter" + "google.golang.org/protobuf/types/known/anypb" + + pb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" +) + +// TypeURL is the message type for the Router configuration. +const TypeURL = "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router" + +func init() { + httpfilter.Register(builder{}) +} + +// IsRouterFilter returns true iff a HTTP filter is a Router filter. +func IsRouterFilter(b httpfilter.Filter) bool { + _, ok := b.(builder) + return ok +} + +type builder struct { +} + +func (builder) TypeURLs() []string { return []string{TypeURL} } + +func (builder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + // The gRPC router filter does not currently use any fields from the + // config. Verify type only. + if cfg == nil { + return nil, fmt.Errorf("router: nil configuration message provided") + } + any, ok := cfg.(*anypb.Any) + if !ok { + return nil, fmt.Errorf("router: error parsing config %v: unknown type %T", cfg, cfg) + } + msg := new(pb.Router) + if err := ptypes.UnmarshalAny(any, msg); err != nil { + return nil, fmt.Errorf("router: error parsing config %v: %v", cfg, err) + } + return config{}, nil +} + +func (builder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + if override != nil { + return nil, fmt.Errorf("router: unexpected config override specified: %v", override) + } + return config{}, nil +} + +var _ httpfilter.ClientInterceptorBuilder = builder{} + +func (builder) BuildClientInterceptor(cfg, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + if _, ok := cfg.(config); !ok { + return nil, fmt.Errorf("router: incorrect config type provided (%T): %v", cfg, cfg) + } + if override != nil { + return nil, fmt.Errorf("router: unexpected override configuration specified: %v", override) + } + // The gRPC router is implemented within the xds resolver's config + // selector, not as a separate plugin. So we return a nil HTTPFilter, + // which will not be invoked. + return nil, nil +} + +// The gRPC router filter does not currently support any configuration. Verify +// type only. +type config struct { + httpfilter.FilterConfig +} diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index d61a546a04e2..95025fc8a423 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -19,6 +19,7 @@ package resolver import ( + "context" "encoding/json" "fmt" "sync/atomic" @@ -29,7 +30,10 @@ import ( "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/httpfilter" + "google.golang.org/grpc/xds/internal/httpfilter/router" ) const ( @@ -94,10 +98,24 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) (string, error) { return string(bs), nil } +type virtualHost struct { + // map from filter name to its config + httpFilterConfigOverride map[string]httpfilter.FilterConfig +} + +// routeCluster holds information about a cluster as referenced by a route. +type routeCluster struct { + name string + // map from filter name to its config + httpFilterConfigOverride map[string]httpfilter.FilterConfig +} + type route struct { m *compositeMatcher // converted from route matchers - clusters wrr.WRR + clusters wrr.WRR // holds *routeCluster entries maxStreamDuration time.Duration + // map from filter name to its config + httpFilterConfigOverride map[string]httpfilter.FilterConfig } func (r route) String() string { @@ -105,9 +123,11 @@ func (r route) String() string { } type configSelector struct { - r *xdsResolver - routes []route - clusters map[string]*clusterInfo + r *xdsResolver + virtualHost virtualHost + routes []route + clusters map[string]*clusterInfo + httpFilterConfig []xdsclient.HTTPFilter } var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") @@ -127,18 +147,23 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } - cluster, ok := rt.clusters.Next().(string) + cluster, ok := rt.clusters.Next().(*routeCluster) if !ok { return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } // Add a ref to the selected cluster, as this RPC needs this cluster until // it is committed. - ref := &cs.clusters[cluster].refCount + ref := &cs.clusters[cluster.name].refCount atomic.AddInt32(ref, 1) + interceptor, err := cs.newInterceptor(rt, cluster) + if err != nil { + return nil, err + } + config := &iresolver.RPCConfig{ // Communicate to the LB policy the chosen cluster. - Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster), + Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name), OnCommitted: func() { // When the RPC is committed, the cluster is no longer required. // Decrease its ref. @@ -151,6 +176,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP } } }, + Interceptor: interceptor, } if env.TimeoutSupport && rt.maxStreamDuration != 0 { @@ -160,6 +186,40 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP return config, nil } +func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) { + if len(cs.httpFilterConfig) == 0 { + return nil, nil + } + interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) + for _, filter := range cs.httpFilterConfig { + if router.IsRouterFilter(filter.Filter) { + // Ignore any filters after the router filter. The router itself + // is currently a nop. + return &interceptorList{interceptors: interceptors}, nil + } + override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority + if override == nil { + override = rt.httpFilterConfigOverride[filter.Name] // route is second priority + } + if override == nil { + override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority + } + ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder) + if !ok { + // Should not happen if it passed xdsClient validation. + return nil, fmt.Errorf("filter does not support use in client") + } + i, err := ib.BuildClientInterceptor(filter.Config, override) + if err != nil { + return nil, fmt.Errorf("error constructing filter: %v", err) + } + if i != nil { + interceptors = append(interceptors, i) + } + } + return nil, fmt.Errorf("error in xds config: no router filter present") +} + // stop decrements refs of all clusters referenced by this config selector. func (cs *configSelector) stop() { // The resolver's old configSelector may be nil. Handle that here. @@ -194,15 +254,20 @@ var newWRR = wrr.NewRandom // r.activeClusters for previously-unseen clusters. func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { cs := &configSelector{ - r: r, - routes: make([]route, len(su.routes)), - clusters: make(map[string]*clusterInfo), + r: r, + virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride}, + routes: make([]route, len(su.virtualHost.Routes)), + clusters: make(map[string]*clusterInfo), + httpFilterConfig: su.ldsConfig.httpFilterConfig, } - for i, rt := range su.routes { + for i, rt := range su.virtualHost.Routes { clusters := newWRR() - for cluster, weight := range rt.Action { - clusters.Add(cluster, int64(weight)) + for cluster, wc := range rt.WeightedClusters { + clusters.Add(&routeCluster{ + name: cluster, + httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + }, int64(wc.Weight)) // Initialize entries in cs.clusters map, creating entries in // r.activeClusters as necessary. Set to zero as they will be @@ -226,6 +291,8 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } else { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } + + cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride } // Account for this config selector's clusters. Do this after no further @@ -242,3 +309,21 @@ type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32 } + +type interceptorList struct { + interceptors []iresolver.ClientInterceptor +} + +func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, cs iresolver.ClientStream) (context.Context, iresolver.ClientStream, error) { + for _, i := range il.interceptors { + var err error + newCTX, newCS, err := i.NewStream(ctx, ri, cs) + if err != nil { + cs.Done() + return nil, nil, err + } + cs = newCS + ctx = newCTX + } + return ctx, cs, nil +} diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 79b83e95aa3c..913ac4ced15c 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -32,8 +32,8 @@ import ( // are of interest to the xds resolver. The RDS request is built by first // making a LDS to get the RouteConfig name. type serviceUpdate struct { - // routes contain matchers+actions to route RPCs. - routes []*xdsclient.Route + // virtualHost contains routes and other configuration to route RPCs. + virtualHost *xdsclient.VirtualHost // ldsConfig contains configuration that applies to all routes. ldsConfig ldsConfig } @@ -44,6 +44,7 @@ type ldsConfig struct { // maxStreamDuration is from the HTTP connection manager's // common_http_protocol_options field. maxStreamDuration time.Duration + httpFilterConfig []xdsclient.HTTPFilter } // watchService uses LDS and RDS to discover information about the provided @@ -104,18 +105,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er return } - oldLDSConfig := w.lastUpdate.ldsConfig - w.lastUpdate.ldsConfig = ldsConfig{maxStreamDuration: update.MaxStreamDuration} + w.lastUpdate.ldsConfig = ldsConfig{ + maxStreamDuration: update.MaxStreamDuration, + httpFilterConfig: update.HTTPFilters, + } if w.rdsName == update.RouteConfigName { // If the new RouteConfigName is same as the previous, don't cancel and // restart the RDS watch. - if w.lastUpdate.ldsConfig != oldLDSConfig { - // The route name didn't change but the LDS data did; send it now. - // If the route name did change, then we will wait until the first - // RDS update before reporting this LDS config. - w.serviceCb(w.lastUpdate, nil) - } + // + // If the route name did change, then we must wait until the first RDS + // update before reporting this LDS config. + w.serviceCb(w.lastUpdate, nil) return } w.rdsName = update.RouteConfigName @@ -149,7 +150,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate, return } - w.lastUpdate.routes = matchVh.Routes + w.lastUpdate.virtualHost = matchVh w.serviceCb(w.lastUpdate, nil) } diff --git a/xds/internal/resolver/watch_service_test.go b/xds/internal/resolver/watch_service_test.go index 55a20372c018..705a3d35ae1b 100644 --- a/xds/internal/resolver/watch_service_test.go +++ b/xds/internal/resolver/watch_service_test.go @@ -140,7 +140,7 @@ func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantU } gotUpdate := u.(serviceUpdateErr) if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) { - return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty())) + return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{}))) } return nil } @@ -166,12 +166,12 @@ func (s) TestServiceWatch(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -179,22 +179,22 @@ func (s) TestServiceWatch(t *testing.T) { t.Fatal(err) } - wantUpdate2 := serviceUpdate{ - routes: []*xdsclient.Route{{ - Path: newStringP(""), - Action: map[string]uint32{cluster: 1}, + wantUpdate2 := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, + Routes: []*xdsclient.Route{{ + Path: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}, }}, - } + }} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Path: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Path: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, { // Another virtual host, with different domains. Domains: []string{"random"}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -220,12 +220,12 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -241,12 +241,12 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2") // RDS update for the new name. - wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}} + wantUpdate2 := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}}}} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}}, }, }, }, nil) @@ -272,12 +272,16 @@ func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, ldsConfig: ldsConfig{maxStreamDuration: time.Second}} + wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{ + Prefix: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}, + ldsConfig: ldsConfig{maxStreamDuration: time.Second}, + } xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -286,19 +290,22 @@ func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) { } // Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case). - wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate2 := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}} xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil { t.Fatal(err) } // RDS update. - wantUpdate3 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}} + wantUpdate3 := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{ + Prefix: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}}, + }} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}}, }, }, }, nil) @@ -324,12 +331,15 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}} + wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{ + Prefix: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, + }} xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index b40e115299a4..d8c09db69b5a 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -247,6 +247,11 @@ func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) { // Do not pass updates to the ClientConn once the resolver is closed. return } + // Remove any existing entry in updateCh and replace with the new one. + select { + case <-r.updateCh: + default: + } r.updateCh <- suWithError{su: su, err: err} } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 218a7a610913..e800d433150f 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -22,6 +22,7 @@ import ( "context" "errors" "reflect" + "strings" "testing" "time" @@ -44,6 +45,8 @@ import ( xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" "google.golang.org/grpc/xds/internal/env" + "google.golang.org/grpc/xds/internal/httpfilter" + "google.golang.org/grpc/xds/internal/httpfilter/router" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -58,6 +61,9 @@ const ( var target = resolver.Target{Endpoint: targetStr} +var routerFilter = xdsclient.HTTPFilter{Name: "rtr", Filter: httpfilter.Get(router.TypeURL)} +var routerFilterList = []xdsclient.HTTPFilter{routerFilter} + type s struct { grpctest.Tester } @@ -253,7 +259,7 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Call the watchAPI callback after closing the resolver, and make sure no @@ -263,7 +269,7 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -288,7 +294,7 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a bad service update and wait for the @@ -316,7 +322,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) defer replaceRandNumGenerator(0)() @@ -326,7 +332,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantClusters map[string]bool }{ { - routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}}, + routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ @@ -338,9 +344,9 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantClusters: map[string]bool{"test-cluster-1": true}, }, { - routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{ - "cluster_1": 75, - "cluster_2": 25, + routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{ + "cluster_1": {Weight: 75}, + "cluster_2": {Weight: 25}, }}}, // This update contains the cluster from the previous update as // well as this update, as the previous config selector still @@ -362,9 +368,9 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, }, { - routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{ - "cluster_1": 75, - "cluster_2": 25, + routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{ + "cluster_1": {Weight: 75}, + "cluster_2": {Weight: 25}, }}}, // With this redundant update, the old config selector has been // stopped, so there are no more references to the first cluster. @@ -450,7 +456,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a good service update and wait for the @@ -459,7 +465,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, }, }, }, nil) @@ -510,7 +516,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a good service update and wait for the @@ -519,7 +525,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, }, }, }, nil) @@ -620,7 +626,7 @@ func (s) TestXDSResolverWRR(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) @@ -632,9 +638,9 @@ func (s) TestXDSResolverWRR(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{ - "A": 5, - "B": 10, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{ + "A": {Weight: 5}, + "B": {Weight: 10}, }}}, }, }, @@ -683,7 +689,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) @@ -697,15 +703,15 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { Domains: []string{targetStr}, Routes: []*client.Route{{ Prefix: newStringP("/foo"), - Action: map[string]uint32{"A": 1}, + WeightedClusters: map[string]xdsclient.WeightedCluster{"A": {Weight: 1}}, MaxStreamDuration: newDurationP(5 * time.Second), }, { Prefix: newStringP("/bar"), - Action: map[string]uint32{"B": 1}, + WeightedClusters: map[string]xdsclient.WeightedCluster{"B": {Weight: 1}}, MaxStreamDuration: newDurationP(0), }, { - Prefix: newStringP(""), - Action: map[string]uint32{"C": 1}, + Prefix: newStringP(""), + WeightedClusters: map[string]xdsclient.WeightedCluster{"C": {Weight: 1}}, }}, }, }, @@ -788,7 +794,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a good service update and wait for the @@ -797,7 +803,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"test-cluster-1": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, }, }, }, nil) @@ -847,7 +853,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"NEW": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"NEW": {Weight: 1}}}}, }, }, }, nil) @@ -855,7 +861,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"NEW": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"NEW": {Weight: 1}}}}, }, }, }, nil) @@ -895,7 +901,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{"NEW": 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{"NEW": {Weight: 1}}}}, }, }, }, nil) @@ -938,7 +944,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a bad service update and wait for the @@ -956,7 +962,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { VirtualHosts: []*xdsclient.VirtualHost{ { Domains: []string{targetStr}, - Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, + Routes: []*client.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}, }, }, }, nil) @@ -994,7 +1000,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil) + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) // Invoke the watchAPI callback with a bad service update and wait for the @@ -1024,6 +1030,262 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) { } } +type filterBuilder struct { + httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test. + path *[]string +} + +var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} + +func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { + if config == nil { + panic("unexpected missing config") + } + *fb.path = append(*fb.path, "build:"+config.(filterCfg).s) + err := config.(filterCfg).newStreamErr + if override != nil { + *fb.path = append(*fb.path, "override:"+override.(filterCfg).s) + err = override.(filterCfg).newStreamErr + } + + return &filterInterceptor{path: fb.path, s: config.(filterCfg).s, err: err}, nil +} + +type filterInterceptor struct { + path *[]string + s string + err error +} + +func (fi *filterInterceptor) NewStream(ctx context.Context, i iresolver.RPCInfo, cs iresolver.ClientStream) (context.Context, iresolver.ClientStream, error) { + *fi.path = append(*fi.path, "newstream:"+fi.s) + if fi.err != nil { + return nil, nil, fi.err + } + return ctx, &clientStream{cs: cs, path: fi.path, s: fi.s}, nil +} + +type clientStream struct { + cs iresolver.ClientStream + path *[]string + s string +} + +func (cs *clientStream) Done() { + *cs.path = append(*cs.path, "done:"+cs.s) + cs.cs.Done() +} + +type filterCfg struct { + httpfilter.FilterConfig + s string + newStreamErr error +} + +func (s) TestXDSResolverHTTPFilters(t *testing.T) { + var path []string + testCases := []struct { + name string + ldsFilters []xdsclient.HTTPFilter + vhOverrides map[string]httpfilter.FilterConfig + rtOverrides map[string]httpfilter.FilterConfig + clOverrides map[string]httpfilter.FilterConfig + rpcRes map[string][][]string + selectErr string + newStreamErr string + }{ + { + name: "no router filter", + ldsFilters: []xdsclient.HTTPFilter{ + {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, + }, + rpcRes: map[string][][]string{ + "1": { + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + }, + }, + selectErr: "no router filter present", + }, + { + name: "ignored after router filter", + ldsFilters: []xdsclient.HTTPFilter{ + {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, + routerFilter, + {Name: "foo2", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo2"}}, + }, + rpcRes: map[string][][]string{ + "1": { + {"build:foo1", "newstream:foo1", "done:foo1"}, + }, + "2": { + {"build:foo1", "newstream:foo1", "done:foo1"}, + {"build:foo1", "newstream:foo1", "done:foo1"}, + {"build:foo1", "newstream:foo1", "done:foo1"}, + }, + }, + }, + { + name: "NewStream error; ensure earlier interceptor Done is still called", + ldsFilters: []xdsclient.HTTPFilter{ + {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, + {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1", newStreamErr: errors.New("bar newstream err")}}, + routerFilter, + }, + rpcRes: map[string][][]string{ + "1": { + {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, + }, + "2": { + {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, + }, + }, + newStreamErr: "bar newstream err", + }, + { + name: "all overrides", + ldsFilters: []xdsclient.HTTPFilter{ + {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1", newStreamErr: errors.New("this is overridden to nil")}}, + {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1"}}, + routerFilter, + }, + vhOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}}, + rtOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}}, + clOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}}, + rpcRes: map[string][][]string{ + "1": { + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + }, + "2": { + {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, + }, + }, + }, + } + + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + xdsC := fakeclient.NewClient() + xdsR, tcc, cancel := testSetup(t, setupOpts{ + xdsClientFunc: func() (xdsClientInterface, error) { return xdsC, nil }, + }) + defer func() { + cancel() + xdsR.Close() + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + + xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{ + RouteConfigName: routeStr, + HTTPFilters: tc.ldsFilters, + }, nil) + if i == 0 { + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + } + + defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) + newWRR = xdstestutils.NewTestWRR + + // Invoke the watchAPI callback with a good service update and wait for the + // UpdateState method to be called on the ClientConn. + xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{ + VirtualHosts: []*xdsclient.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*client.Route{{ + Prefix: newStringP("1"), WeightedClusters: map[string]xdsclient.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1}, + }, + }, { + Prefix: newStringP("2"), WeightedClusters: map[string]xdsclient.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1, HTTPFilterConfigOverride: tc.clOverrides}, + }, + HTTPFilterConfigOverride: tc.rtOverrides, + }}, + HTTPFilterConfigOverride: tc.vhOverrides, + }, + }, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("ClientConn.UpdateState returned error: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + for method, wants := range tc.rpcRes { + // Order of wants is non-deterministic. + remainingWant := make([][]string, len(wants)) + copy(remainingWant, wants) + for n := range wants { + path = nil + + res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: context.Background()}) + if tc.selectErr != "" { + if err == nil || !strings.Contains(err.Error(), tc.selectErr) { + t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, tc.selectErr) + } + if err == nil { + res.OnCommitted() + } + continue + } + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + _, cs, err := res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, iresolver.NOPClientStream{}) + if tc.newStreamErr != "" { + if err == nil || !strings.Contains(err.Error(), tc.newStreamErr) { + t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.newStreamErr) + } + if err == nil { + res.OnCommitted() + cs.Done() + } + continue + } + if err != nil { + t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) + + } + res.OnCommitted() + cs.Done() + + // Confirm the desired path is found in remainingWant, and remove it. + pass := false + for i := range remainingWant { + if reflect.DeepEqual(path, remainingWant[i]) { + remainingWant[i] = remainingWant[len(remainingWant)-1] + remainingWant = remainingWant[:len(remainingWant)-1] + pass = true + break + } + } + if !pass { + t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, path, remainingWant) + } + } + } + }) + } +} + func replaceRandNumGenerator(start int64) func() { nextInt := start grpcrandInt63n = func(int64) (ret int64) {