Skip to content

Commit

Permalink
more review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Feb 24, 2021
1 parent f4de499 commit e1a09d1
Showing 1 changed file with 25 additions and 36 deletions.
61 changes: 25 additions & 36 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,19 @@ type virtualHost struct {
httpFilterConfigOverride map[string]httpfilter.FilterConfig
}

// routeCluster holds information about a cluster as referenced by a route.
type routeCluster struct {
name string
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
}

type route struct {
m *compositeMatcher // converted from route matchers
clusters wrr.WRR
clusters wrr.WRR // holds *routeCluster entries
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// map from cluster to its httpFilterConfigOverride
clusterHFCO map[string]map[string]httpfilter.FilterConfig
}

func (r route) String() string {
Expand Down Expand Up @@ -142,13 +147,13 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if rt == nil || rt.clusters == nil {
return nil, errNoMatchedRouteFound
}
cluster, ok := rt.clusters.Next().(string)
cluster, ok := rt.clusters.Next().(*routeCluster)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster].refCount
ref := &cs.clusters[cluster.name].refCount
atomic.AddInt32(ref, 1)

interceptor, err := cs.newInterceptor(rt, cluster)
Expand All @@ -158,7 +163,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP

config := &iresolver.RPCConfig{
// Communicate to the LB policy the chosen cluster.
Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster),
Context: clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name),
OnCommitted: func() {
// When the RPC is committed, the cluster is no longer required.
// Decrease its ref.
Expand All @@ -181,7 +186,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
return config, nil
}

func (cs *configSelector) newInterceptor(rt *route, cluster string) (iresolver.ClientInterceptor, error) {
func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
if len(cs.httpFilterConfig) == 0 {
return nil, nil
}
Expand All @@ -190,16 +195,14 @@ func (cs *configSelector) newInterceptor(rt *route, cluster string) (iresolver.C
if router.IsRouterFilter(filter.Filter) {
// Ignore any filters after the router filter. The router itself
// is currently a nop.
break
return &interceptorList{interceptors: interceptors}, nil
}
override := cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH overrides listener
if o := rt.httpFilterConfigOverride[filter.Name]; o != nil {
override = o // route overrides VH
if m := rt.clusterHFCO[cluster]; m != nil {
if o := m[filter.Name]; o != nil {
override = o // cluster overrides route
}
}
override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
if override == nil {
override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
}
if override == nil {
override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
Expand All @@ -214,7 +217,7 @@ func (cs *configSelector) newInterceptor(rt *route, cluster string) (iresolver.C
interceptors = append(interceptors, i)
}
}
return &interceptorList{interceptors: interceptors}, nil
return nil, fmt.Errorf("error in xds config: no router filter present")
}

// stop decrements refs of all clusters referenced by this config selector.
Expand Down Expand Up @@ -255,14 +258,16 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: append(su.ldsConfig.httpFilterConfig, xdsclient.HTTPFilter{Filter: failingFilter{}}),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
}

for i, rt := range su.virtualHost.Routes {
cs.routes[i].clusterHFCO = make(map[string]map[string]httpfilter.FilterConfig, len(rt.WeightedClusters))
clusters := newWRR()
for cluster, wc := range rt.WeightedClusters {
clusters.Add(cluster, int64(wc.Weight))
clusters.Add(&routeCluster{
name: cluster,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))

// Initialize entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Set to zero as they will be
Expand All @@ -273,7 +278,6 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
r.activeClusters[cluster] = ci
}
cs.clusters[cluster] = ci
cs.routes[i].clusterHFCO[cluster] = wc.HTTPFilterConfigOverride
}
cs.routes[i].clusters = clusters

Expand Down Expand Up @@ -308,7 +312,6 @@ type clusterInfo struct {

type interceptorList struct {
interceptors []iresolver.ClientInterceptor
done func()
}

func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo, cs iresolver.ClientStream) (context.Context, iresolver.ClientStream, error) {
Expand All @@ -322,19 +325,5 @@ func (il *interceptorList) NewStream(ctx context.Context, ri iresolver.RPCInfo,
cs = newCS
ctx = newCTX
}
il.done = cs.Done
return ctx, cs, nil
}

func (il *interceptorList) Done() {
il.done()
}

// failingFilter always fails at construction.
type failingFilter struct {
httpfilter.Filter
}

func (failingFilter) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
return nil, fmt.Errorf("error in xds config: no router filter present")
}

0 comments on commit e1a09d1

Please sign in to comment.