Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(xds): put the Gatewaylisteners in the Proxy #8051

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 38 additions & 53 deletions pkg/api-server/inspect_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,14 @@ import (
func getMatchedPolicies(
ctx context.Context, cfg *kuma_cp.Config, meshContext xds_context.MeshContext, dataplaneKey core_model.ResourceKey,
) (
*core_xds.MatchedPolicies, []gateway.GatewayListenerInfo, core_xds.Proxy, error,
*core_xds.Proxy, error,
) {
proxyBuilder := sync.DefaultDataplaneProxyBuilder(*cfg, envoy.APIV3)
if proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext); err != nil {
return nil, nil, core_xds.Proxy{}, err
} else {
if proxy.Dataplane.Spec.IsBuiltinGateway() {
entries, err := gateway.GatewayListenerInfoFromProxy(
ctx, meshContext, proxy, proxyBuilder.Zone,
)
if err != nil {
return nil, nil, core_xds.Proxy{}, err
}

return nil, entries, *proxy, nil
}
return &proxy.Policies, nil, *proxy, nil
proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext)
if err != nil {
return nil, err
}
return proxy, nil
}

func addInspectEndpoints(
Expand Down Expand Up @@ -113,7 +103,7 @@ func inspectDataplane(cfg *kuma_cp.Config, builder xds_context.MeshContextBuilde
return
}

matchedPolicies, gatewayEntries, proxy, err := getMatchedPolicies(
proxy, err := getMatchedPolicies(
request.Request.Context(), cfg, meshContext, core_model.ResourceKey{Mesh: meshName, Name: dataplaneName},
)
if err != nil {
Expand All @@ -122,13 +112,13 @@ func inspectDataplane(cfg *kuma_cp.Config, builder xds_context.MeshContextBuilde
}

var result api_server_types.DataplaneInspectResponse
if matchedPolicies != nil {
if !proxy.Dataplane.Spec.IsBuiltinGateway() {
inner := api_server_types.NewDataplaneInspectEntryList()
inner.Items = append(inner.Items, newDataplaneInspectResponse(matchedPolicies, proxy.Dataplane)...)
inner.Items = append(inner.Items, newDataplaneInspectResponse(&proxy.Policies, proxy.Dataplane)...)
inner.Total = uint32(len(inner.Items))
result = api_server_types.NewDataplaneInspectResponse(inner)
} else {
inner := newGatewayDataplaneInspectResponse(proxy, gatewayEntries)
inner := newGatewayDataplaneInspectResponse(proxy)
result = api_server_types.NewDataplaneInspectResponse(&inner)
}
if err := response.WriteAsJson(result); err != nil {
Expand Down Expand Up @@ -212,12 +202,12 @@ func inspectGatewayRouteDataplanes(
continue
}
key := core_model.MetaToResourceKey(dp.GetMeta())
_, listeners, _, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, key)
proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, key)
if err != nil {
rest_errors.HandleError(request.Request.Context(), response, err, "Could not generate listener info")
return
}
for _, listener := range listeners {
for _, listener := range gateway.ExtractGatewayListener(proxy) {
for _, host := range listener.HostInfos {
for _, entry := range host.Entries {
if entry.Route != gatewayRoute.GetMeta().GetName() {
Expand Down Expand Up @@ -272,13 +262,19 @@ func inspectPolicies(
Mesh: dpKey.Mesh,
Name: dpKey.Name,
}
matchedPolicies, gatewayEntries, proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, dpKey)
proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, dpKey)
if err != nil {
rest_errors.HandleError(request.Request.Context(), response, err, fmt.Sprintf("Could not get MatchedPolicies for %v", dpKey))
return
}
if matchedPolicies != nil {
for policy, attachments := range inspect.GroupByPolicy(matchedPolicies, dp.Spec.Networking) {
if proxy.Dataplane.Spec.IsBuiltinGateway() {
for policy, attachments := range gatewayEntriesByPolicy(proxy) {
if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName {
result.Items = append(result.Items, attachments...)
}
}
} else {
for policy, attachments := range inspect.GroupByPolicy(&proxy.Policies, dp.Spec.Networking) {
if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName {
attachmentList := []api_server_types.AttachmentEntry{}
for _, attachment := range attachments {
Expand All @@ -293,12 +289,6 @@ func inspectPolicies(
result.Items = append(result.Items, api_server_types.NewPolicyInspectEntry(&entry))
}
}
} else {
for policy, attachments := range gatewayEntriesByPolicy(proxy, gatewayEntries) {
if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName {
result.Items = append(result.Items, attachments...)
}
}
}
}

Expand Down Expand Up @@ -356,12 +346,18 @@ func newDataplaneInspectResponse(matchedPolicies *core_xds.MatchedPolicies, dp *
}

func newGatewayDataplaneInspectResponse(
proxy core_xds.Proxy,
listenerInfos []gateway.GatewayListenerInfo,
proxy *core_xds.Proxy,
) api_server_types.GatewayDataplaneInspectResult {
var listeners []api_server_types.GatewayListenerInspectEntry
result := api_server_types.NewGatewayDataplaneInspectResult()
gwListeners := gateway.ExtractGatewayListener(proxy)
if len(gwListeners) > 0 {
result.Gateway = api_server_types.ResourceKeyEntry{
Mesh: gwListeners[0].Gateway.GetMeta().GetMesh(),
Name: gwListeners[0].Gateway.GetMeta().GetName(),
}
}

for _, info := range listenerInfos {
for _, info := range gwListeners {
var hosts []api_server_types.HostInspectEntry
for _, info := range info.HostInfos {
var routes []api_server_types.RouteInspectEntry
Expand Down Expand Up @@ -399,24 +395,13 @@ func newGatewayDataplaneInspectResponse(
sort.SliceStable(hosts, func(i, j int) bool {
return hosts[i].HostName < hosts[j].HostName
})
listeners = append(listeners, api_server_types.GatewayListenerInspectEntry{
result.Listeners = append(result.Listeners, api_server_types.GatewayListenerInspectEntry{
Port: info.Listener.Port,
Protocol: info.Listener.Protocol.String(),
Hosts: hosts,
})
}

result := api_server_types.NewGatewayDataplaneInspectResult()
result.Listeners = listeners

if len(listeners) > 0 {
gatewayKey := core_model.MetaToResourceKey(listenerInfos[0].Gateway.GetMeta())
result.Gateway = api_server_types.ResourceKeyEntry{
Mesh: gatewayKey.Mesh,
Name: gatewayKey.Name,
}
}

gatewayPolicies := api_server_types.PolicyMap{}

// TrafficLog and TrafficeTrace are applied to the entire MeshGateway
Expand Down Expand Up @@ -454,10 +439,10 @@ func routeToPolicyInspect(
}

func gatewayEntriesByPolicy(
proxy core_xds.Proxy,
listenerInfos []gateway.GatewayListenerInfo,
proxy *core_xds.Proxy,
) map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry {
policyMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry{}
gwListeners := gateway.ExtractGatewayListener(proxy)

dpKey := core_model.MetaToResourceKey(proxy.Dataplane.GetMeta())
resourceKey := api_server_types.ResourceKeyEntry{
Expand All @@ -466,7 +451,7 @@ func gatewayEntriesByPolicy(
}

listenersMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayListenerEntry{}
for _, info := range listenerInfos {
for _, info := range gwListeners {
hostMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayHostEntry{}
for _, info := range info.HostInfos {
routeMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayRouteEntry{}
Expand Down Expand Up @@ -513,8 +498,8 @@ func gatewayEntriesByPolicy(
}

var gatewayKey api_server_types.ResourceKeyEntry
if len(listenerInfos) > 0 {
gatewayKey = api_server_types.ResourceKeyEntryFromModelKey(core_model.MetaToResourceKey(listenerInfos[0].Gateway.GetMeta()))
if len(gwListeners) > 0 {
gatewayKey = api_server_types.ResourceKeyEntryFromModelKey(core_model.MetaToResourceKey(gwListeners[0].Gateway.GetMeta()))
}
for policy, listeners := range listenersMap {
result := api_server_types.NewPolicyInspectGatewayEntry(resourceKey, gatewayKey)
Expand Down Expand Up @@ -567,14 +552,14 @@ func inspectRulesAttachment(cfg *kuma_cp.Config, builder xds_context.MeshContext
return
}

matchedPolicies, _, proxy, err := getMatchedPolicies(
proxy, err := getMatchedPolicies(
ctx, cfg, meshContext, core_model.ResourceKey{Mesh: meshName, Name: dataplaneName},
)
if err != nil {
rest_errors.HandleError(request.Request.Context(), response, err, "Could not get MatchedPolicies")
return
}
rulesAttachments := inspect.BuildRulesAttachments(matchedPolicies.Dynamic, proxy.Dataplane.Spec.Networking, meshContext.VIPDomains)
rulesAttachments := inspect.BuildRulesAttachments(proxy.Policies.Dynamic, proxy.Dataplane.Spec.Networking, meshContext.VIPDomains)
resp := api_server_types.RuleInspectResponse{
Items: []api_server_types.RuleInspectEntry{},
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/core/plugins/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package plugins

import (
"context"

"github.com/pkg/errors"

"github.com/kumahq/kuma/pkg/api-server/authn"
Expand Down Expand Up @@ -102,3 +104,10 @@ type EgressPolicyPlugin interface {
// should be applied on the zone egress.
EgressMatchedPolicies(*core_mesh.ExternalServiceResource, xds_context.Resources) (core_xds.TypedMatchingPolicies, error)
}

// ProxyPlugin a plugin to modify the proxy. This happens before any `PolicyPlugin` or any envoy generation. and it is applied both for Dataplanes and ZoneProxies
type ProxyPlugin interface {
Plugin
// Apply mutate the proxy as needed.
Apply(ctx context.Context, meshCtx xds_context.MeshContext, proxy *core_xds.Proxy) error
}
14 changes: 14 additions & 0 deletions pkg/core/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
caPlugin pluginType = "ca"
authnAPIServer pluginType = "authn-api-server"
policyPlugin pluginType = "policy"
proxyPlugin pluginType = "proxy"
)

type PluginName string
Expand All @@ -40,6 +41,7 @@ type Registry interface {
CaPlugins() map[PluginName]CaPlugin
AuthnAPIServer() map[PluginName]AuthnAPIServerPlugin
PolicyPlugins() map[PluginName]PolicyPlugin
ProxyPlugins() map[PluginName]ProxyPlugin
}

type RegistryMutator interface {
Expand All @@ -61,6 +63,7 @@ func NewRegistry() MutableRegistry {
ca: make(map[PluginName]CaPlugin),
authnAPIServer: make(map[PluginName]AuthnAPIServerPlugin),
policy: make(map[PluginName]PolicyPlugin),
proxy: make(map[PluginName]ProxyPlugin),
}
}

Expand All @@ -72,6 +75,7 @@ type registry struct {
secretStore map[PluginName]SecretStorePlugin
configStore map[PluginName]ConfigStorePlugin
runtime map[PluginName]RuntimePlugin
proxy map[PluginName]ProxyPlugin
ca map[PluginName]CaPlugin
authnAPIServer map[PluginName]AuthnAPIServerPlugin
policy map[PluginName]PolicyPlugin
Expand Down Expand Up @@ -109,6 +113,10 @@ func (r *registry) RuntimePlugins() map[PluginName]RuntimePlugin {
return r.runtime
}

func (r *registry) ProxyPlugins() map[PluginName]ProxyPlugin {
return r.proxy
}

func (r *registry) PolicyPlugins() map[PluginName]PolicyPlugin {
return r.policy
}
Expand Down Expand Up @@ -182,6 +190,12 @@ func (r *registry) Register(name PluginName, plugin Plugin) error {
}
r.policy[name] = policy
}
if proxy, ok := plugin.(ProxyPlugin); ok {
if old, exists := r.policy[name]; exists {
return pluginAlreadyRegisteredError(proxyPlugin, name, old, proxy)
}
r.proxy[name] = proxy
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/core/xds/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ type Proxy struct {
ZoneEgressProxy *ZoneEgressProxy
// ZoneIngressProxy is available only when XDS is generated for ZoneIngress data plane proxy.
ZoneIngressProxy *ZoneIngressProxy
// RuntimeExtensions a set of extensions to add for custom extensions (.e.g MeshGateway)
RuntimeExtensions map[string]interface{}
// Zone the zone the proxy is in
Zone string
}

type ServerSideMTLSCerts struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("MeshAccessLog", func() {
resourceSet.Add(&r)
}

context := xds_context.Context{
xdsCtx := xds_context.Context{
Mesh: xds_context.MeshContext{
Resource: &core_mesh.MeshResource{
Meta: &test_model.ResourceMeta{
Expand Down Expand Up @@ -107,7 +107,7 @@ var _ = Describe("MeshAccessLog", func() {
}
plugin := plugin.NewPlugin().(core_plugins.PolicyPlugin)

Expect(plugin.Apply(resourceSet, context, &proxy)).To(Succeed())
Expect(plugin.Apply(resourceSet, xdsCtx, &proxy)).To(Succeed())
policies_xds.ResourceArrayShouldEqual(resourceSet.ListOf(envoy_resource.ListenerType), given.expectedListeners)
policies_xds.ResourceArrayShouldEqual(resourceSet.ListOf(envoy_resource.ClusterType), given.expectedClusters)
},
Expand Down Expand Up @@ -1014,6 +1014,10 @@ var _ = Describe("MeshAccessLog", func() {
},
},
},
RuntimeExtensions: map[string]interface{}{},
}
for n, p := range core_plugins.Plugins().ProxyPlugins() {
Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n)
}

gatewayGenerator := gateway_plugin.NewGenerator("test-zone")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package v1alpha1

import (
"context"

envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
Expand All @@ -14,7 +12,7 @@ import (
policies_xds "github.com/kumahq/kuma/pkg/plugins/policies/core/xds"
api "github.com/kumahq/kuma/pkg/plugins/policies/meshcircuitbreaker/api/v1alpha1"
plugin_xds "github.com/kumahq/kuma/pkg/plugins/policies/meshcircuitbreaker/plugin/xds"
gateway_plugin "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
gateway "github.com/kumahq/kuma/pkg/plugins/runtime/gateway"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names"
)
Expand Down Expand Up @@ -54,7 +52,7 @@ func (p plugin) Apply(
return err
}

if err := applyToGateways(ctx, policies.ToRules, clusters.Gateway, proxy); err != nil {
if err := applyToGateways(policies.ToRules, clusters.Gateway, proxy); err != nil {
return err
}

Expand Down Expand Up @@ -110,22 +108,13 @@ func applyToOutbounds(
}

func applyToGateways(
ctx xds_context.Context,
rules core_rules.ToRules,
gatewayClusters map[string]*envoy_cluster.Cluster,
proxy *core_xds.Proxy,
) error {
if !proxy.Dataplane.Spec.IsBuiltinGateway() {
return nil
}
gatewayListenerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone)
if err != nil {
return err
}

for _, listenerInfo := range gatewayListenerInfos {
for _, listenerInfo := range gateway.ExtractGatewayListener(proxy) {
for _, hostInfo := range listenerInfo.HostInfos {
destinations := gateway_plugin.RouteDestinationsMutable(hostInfo.Entries)
destinations := gateway.RouteDestinationsMutable(hostInfo.Entries)
for _, dest := range destinations {
clusterName, err := dest.Destination.DestinationClusterName(hostInfo.Host.Tags)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ var _ = Describe("MeshCircuitBreaker", func() {
},
},
},
RuntimeExtensions: map[string]interface{}{},
}
for n, p := range core_plugins.Plugins().ProxyPlugins() {
Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n)
}
gatewayGenerator := gateway_plugin.NewGenerator("test-zone")
generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy)
Expand Down
Loading