From 859cf9be928e4b3cfc1b6b38845bb2b5bfba2ce5 Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Mon, 16 Oct 2023 17:14:40 +0200 Subject: [PATCH 1/2] fix(perf): put the Gatewaylisteners in the Proxy We were constantly recomputing these. This was making code hard to read and proxy had some memory impact. We also introduce a plugin API for proxy modification which helps avoiding circular dependency between core_xds and plugins Signed-off-by: Charly Molter --- pkg/api-server/inspect_endpoints.go | 91 ++++++++----------- pkg/core/plugins/interfaces.go | 9 ++ pkg/core/plugins/registry.go | 14 +++ pkg/core/xds/types.go | 4 + .../plugin/v1alpha1/plugin_test.go | 8 +- .../plugin/v1alpha1/plugin.go | 19 +--- .../plugin/v1alpha1/plugin_test.go | 4 + .../plugin/v1alpha1/plugin.go | 11 +-- .../plugin/v1alpha1/plugin_test.go | 4 + .../meshhealthcheck/plugin/v1alpha1/plugin.go | 15 +-- .../plugin/v1alpha1/plugin_test.go | 4 + .../plugin/v1alpha1/plugin.go | 13 +-- .../plugin/v1alpha1/plugin_test.go | 4 + .../meshratelimit/plugin/v1alpha1/plugin.go | 14 +-- .../plugin/v1alpha1/plugin_test.go | 22 +---- .../meshretry/plugin/v1alpha1/plugin.go | 16 +--- .../meshretry/plugin/v1alpha1/plugin_test.go | 4 + .../meshtimeout/plugin/v1alpha1/plugin.go | 11 +-- .../plugin/v1alpha1/plugin_test.go | 4 + pkg/plugins/runtime/gateway/generator.go | 18 ++-- .../runtime/gateway/metadata/metadata.go | 8 +- pkg/plugins/runtime/gateway/plugin.go | 26 ++++++ pkg/plugins/runtime/gateway/suite_test.go | 14 ++- pkg/xds/sync/dataplane_proxy_builder.go | 26 ++++-- pkg/xds/sync/egress_proxy_builder.go | 13 ++- pkg/xds/sync/ingress_proxy_builder.go | 10 ++ 26 files changed, 200 insertions(+), 186 deletions(-) diff --git a/pkg/api-server/inspect_endpoints.go b/pkg/api-server/inspect_endpoints.go index c3220804cc80..8925cf0289b7 100644 --- a/pkg/api-server/inspect_endpoints.go +++ b/pkg/api-server/inspect_endpoints.go @@ -33,24 +33,14 @@ import ( func getMatchedPolicies( ctx context.Context, cfg *kuma_cp.Config, meshContext xds_context.MeshContext, dataplaneKey core_model.ResourceKey, ) ( - *core_xds.MatchedPolicies, []gateway.GatewayListenerInfo, core_xds.Proxy, error, + *core_xds.Proxy, error, ) { proxyBuilder := sync.DefaultDataplaneProxyBuilder(*cfg, envoy.APIV3) - if proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext); err != nil { - return nil, nil, core_xds.Proxy{}, err - } else { - if proxy.Dataplane.Spec.IsBuiltinGateway() { - entries, err := gateway.GatewayListenerInfoFromProxy( - ctx, meshContext, proxy, proxyBuilder.Zone, - ) - if err != nil { - return nil, nil, core_xds.Proxy{}, err - } - - return nil, entries, *proxy, nil - } - return &proxy.Policies, nil, *proxy, nil + proxy, err := proxyBuilder.Build(ctx, dataplaneKey, meshContext) + if err != nil { + return nil, err } + return proxy, nil } func addInspectEndpoints( @@ -113,7 +103,7 @@ func inspectDataplane(cfg *kuma_cp.Config, builder xds_context.MeshContextBuilde return } - matchedPolicies, gatewayEntries, proxy, err := getMatchedPolicies( + proxy, err := getMatchedPolicies( request.Request.Context(), cfg, meshContext, core_model.ResourceKey{Mesh: meshName, Name: dataplaneName}, ) if err != nil { @@ -122,13 +112,13 @@ func inspectDataplane(cfg *kuma_cp.Config, builder xds_context.MeshContextBuilde } var result api_server_types.DataplaneInspectResponse - if matchedPolicies != nil { + if !proxy.Dataplane.Spec.IsBuiltinGateway() { inner := api_server_types.NewDataplaneInspectEntryList() - inner.Items = append(inner.Items, newDataplaneInspectResponse(matchedPolicies, proxy.Dataplane)...) + inner.Items = append(inner.Items, newDataplaneInspectResponse(&proxy.Policies, proxy.Dataplane)...) inner.Total = uint32(len(inner.Items)) result = api_server_types.NewDataplaneInspectResponse(inner) } else { - inner := newGatewayDataplaneInspectResponse(proxy, gatewayEntries) + inner := newGatewayDataplaneInspectResponse(proxy) result = api_server_types.NewDataplaneInspectResponse(&inner) } if err := response.WriteAsJson(result); err != nil { @@ -212,12 +202,12 @@ func inspectGatewayRouteDataplanes( continue } key := core_model.MetaToResourceKey(dp.GetMeta()) - _, listeners, _, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, key) + proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, key) if err != nil { rest_errors.HandleError(request.Request.Context(), response, err, "Could not generate listener info") return } - for _, listener := range listeners { + for _, listener := range gateway.ExtractGatewayListener(proxy) { for _, host := range listener.HostInfos { for _, entry := range host.Entries { if entry.Route != gatewayRoute.GetMeta().GetName() { @@ -272,13 +262,19 @@ func inspectPolicies( Mesh: dpKey.Mesh, Name: dpKey.Name, } - matchedPolicies, gatewayEntries, proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, dpKey) + proxy, err := getMatchedPolicies(request.Request.Context(), cfg, meshContext, dpKey) if err != nil { rest_errors.HandleError(request.Request.Context(), response, err, fmt.Sprintf("Could not get MatchedPolicies for %v", dpKey)) return } - if matchedPolicies != nil { - for policy, attachments := range inspect.GroupByPolicy(matchedPolicies, dp.Spec.Networking) { + if proxy.Dataplane.Spec.IsBuiltinGateway() { + for policy, attachments := range gatewayEntriesByPolicy(proxy) { + if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName { + result.Items = append(result.Items, attachments...) + } + } + } else { + for policy, attachments := range inspect.GroupByPolicy(&proxy.Policies, dp.Spec.Networking) { if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName { attachmentList := []api_server_types.AttachmentEntry{} for _, attachment := range attachments { @@ -293,12 +289,6 @@ func inspectPolicies( result.Items = append(result.Items, api_server_types.NewPolicyInspectEntry(&entry)) } } - } else { - for policy, attachments := range gatewayEntriesByPolicy(proxy, gatewayEntries) { - if policy.Type == resType && policy.Key.Name == policyName && policy.Key.Mesh == meshName { - result.Items = append(result.Items, attachments...) - } - } } } @@ -356,12 +346,18 @@ func newDataplaneInspectResponse(matchedPolicies *core_xds.MatchedPolicies, dp * } func newGatewayDataplaneInspectResponse( - proxy core_xds.Proxy, - listenerInfos []gateway.GatewayListenerInfo, + proxy *core_xds.Proxy, ) api_server_types.GatewayDataplaneInspectResult { - var listeners []api_server_types.GatewayListenerInspectEntry + result := api_server_types.NewGatewayDataplaneInspectResult() + gwListeners := gateway.ExtractGatewayListener(proxy) + if len(gwListeners) > 0 { + result.Gateway = api_server_types.ResourceKeyEntry{ + Mesh: gwListeners[0].Gateway.GetMeta().GetMesh(), + Name: gwListeners[0].Gateway.GetMeta().GetName(), + } + } - for _, info := range listenerInfos { + for _, info := range gwListeners { var hosts []api_server_types.HostInspectEntry for _, info := range info.HostInfos { var routes []api_server_types.RouteInspectEntry @@ -399,24 +395,13 @@ func newGatewayDataplaneInspectResponse( sort.SliceStable(hosts, func(i, j int) bool { return hosts[i].HostName < hosts[j].HostName }) - listeners = append(listeners, api_server_types.GatewayListenerInspectEntry{ + result.Listeners = append(result.Listeners, api_server_types.GatewayListenerInspectEntry{ Port: info.Listener.Port, Protocol: info.Listener.Protocol.String(), Hosts: hosts, }) } - result := api_server_types.NewGatewayDataplaneInspectResult() - result.Listeners = listeners - - if len(listeners) > 0 { - gatewayKey := core_model.MetaToResourceKey(listenerInfos[0].Gateway.GetMeta()) - result.Gateway = api_server_types.ResourceKeyEntry{ - Mesh: gatewayKey.Mesh, - Name: gatewayKey.Name, - } - } - gatewayPolicies := api_server_types.PolicyMap{} // TrafficLog and TrafficeTrace are applied to the entire MeshGateway @@ -454,10 +439,10 @@ func routeToPolicyInspect( } func gatewayEntriesByPolicy( - proxy core_xds.Proxy, - listenerInfos []gateway.GatewayListenerInfo, + proxy *core_xds.Proxy, ) map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry { policyMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry{} + gwListeners := gateway.ExtractGatewayListener(proxy) dpKey := core_model.MetaToResourceKey(proxy.Dataplane.GetMeta()) resourceKey := api_server_types.ResourceKeyEntry{ @@ -466,7 +451,7 @@ func gatewayEntriesByPolicy( } listenersMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayListenerEntry{} - for _, info := range listenerInfos { + for _, info := range gwListeners { hostMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayHostEntry{} for _, info := range info.HostInfos { routeMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectGatewayRouteEntry{} @@ -513,8 +498,8 @@ func gatewayEntriesByPolicy( } var gatewayKey api_server_types.ResourceKeyEntry - if len(listenerInfos) > 0 { - gatewayKey = api_server_types.ResourceKeyEntryFromModelKey(core_model.MetaToResourceKey(listenerInfos[0].Gateway.GetMeta())) + if len(gwListeners) > 0 { + gatewayKey = api_server_types.ResourceKeyEntryFromModelKey(core_model.MetaToResourceKey(gwListeners[0].Gateway.GetMeta())) } for policy, listeners := range listenersMap { result := api_server_types.NewPolicyInspectGatewayEntry(resourceKey, gatewayKey) @@ -567,14 +552,14 @@ func inspectRulesAttachment(cfg *kuma_cp.Config, builder xds_context.MeshContext return } - matchedPolicies, _, proxy, err := getMatchedPolicies( + proxy, err := getMatchedPolicies( ctx, cfg, meshContext, core_model.ResourceKey{Mesh: meshName, Name: dataplaneName}, ) if err != nil { rest_errors.HandleError(request.Request.Context(), response, err, "Could not get MatchedPolicies") return } - rulesAttachments := inspect.BuildRulesAttachments(matchedPolicies.Dynamic, proxy.Dataplane.Spec.Networking, meshContext.VIPDomains) + rulesAttachments := inspect.BuildRulesAttachments(proxy.Policies.Dynamic, proxy.Dataplane.Spec.Networking, meshContext.VIPDomains) resp := api_server_types.RuleInspectResponse{ Items: []api_server_types.RuleInspectEntry{}, } diff --git a/pkg/core/plugins/interfaces.go b/pkg/core/plugins/interfaces.go index 432d0f32b537..b0d2939bbc0b 100644 --- a/pkg/core/plugins/interfaces.go +++ b/pkg/core/plugins/interfaces.go @@ -1,6 +1,8 @@ package plugins import ( + "context" + "github.com/pkg/errors" "github.com/kumahq/kuma/pkg/api-server/authn" @@ -102,3 +104,10 @@ type EgressPolicyPlugin interface { // should be applied on the zone egress. EgressMatchedPolicies(*core_mesh.ExternalServiceResource, xds_context.Resources) (core_xds.TypedMatchingPolicies, error) } + +// ProxyPlugin a plugin to modify the proxy. This happens before any `PolicyPlugin` or any envoy generation. and it is applied both for Dataplanes and ZoneProxies +type ProxyPlugin interface { + Plugin + // Apply mutate the proxy as needed. + Apply(ctx context.Context, meshCtx xds_context.MeshContext, proxy *core_xds.Proxy) error +} diff --git a/pkg/core/plugins/registry.go b/pkg/core/plugins/registry.go index 360a99c399ef..ed70fe80f5e9 100644 --- a/pkg/core/plugins/registry.go +++ b/pkg/core/plugins/registry.go @@ -17,6 +17,7 @@ const ( caPlugin pluginType = "ca" authnAPIServer pluginType = "authn-api-server" policyPlugin pluginType = "policy" + proxyPlugin pluginType = "proxy" ) type PluginName string @@ -40,6 +41,7 @@ type Registry interface { CaPlugins() map[PluginName]CaPlugin AuthnAPIServer() map[PluginName]AuthnAPIServerPlugin PolicyPlugins() map[PluginName]PolicyPlugin + ProxyPlugins() map[PluginName]ProxyPlugin } type RegistryMutator interface { @@ -61,6 +63,7 @@ func NewRegistry() MutableRegistry { ca: make(map[PluginName]CaPlugin), authnAPIServer: make(map[PluginName]AuthnAPIServerPlugin), policy: make(map[PluginName]PolicyPlugin), + proxy: make(map[PluginName]ProxyPlugin), } } @@ -72,6 +75,7 @@ type registry struct { secretStore map[PluginName]SecretStorePlugin configStore map[PluginName]ConfigStorePlugin runtime map[PluginName]RuntimePlugin + proxy map[PluginName]ProxyPlugin ca map[PluginName]CaPlugin authnAPIServer map[PluginName]AuthnAPIServerPlugin policy map[PluginName]PolicyPlugin @@ -109,6 +113,10 @@ func (r *registry) RuntimePlugins() map[PluginName]RuntimePlugin { return r.runtime } +func (r *registry) ProxyPlugins() map[PluginName]ProxyPlugin { + return r.proxy +} + func (r *registry) PolicyPlugins() map[PluginName]PolicyPlugin { return r.policy } @@ -182,6 +190,12 @@ func (r *registry) Register(name PluginName, plugin Plugin) error { } r.policy[name] = policy } + if proxy, ok := plugin.(ProxyPlugin); ok { + if old, exists := r.policy[name]; exists { + return pluginAlreadyRegisteredError(proxyPlugin, name, old, proxy) + } + r.proxy[name] = proxy + } return nil } diff --git a/pkg/core/xds/types.go b/pkg/core/xds/types.go index d7bb265f7967..be119f3f529f 100644 --- a/pkg/core/xds/types.go +++ b/pkg/core/xds/types.go @@ -150,6 +150,10 @@ type Proxy struct { ZoneEgressProxy *ZoneEgressProxy // ZoneIngressProxy is available only when XDS is generated for ZoneIngress data plane proxy. ZoneIngressProxy *ZoneIngressProxy + // RuntimeExtensions a set of extensions to add for custom extensions (.e.g MeshGateway) + RuntimeExtensions map[string]interface{} + // Zone the zone the proxy is in + Zone string } type ServerSideMTLSCerts struct { diff --git a/pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin_test.go index 2f5c57117931..55d65fe4a975 100644 --- a/pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshaccesslog/plugin/v1alpha1/plugin_test.go @@ -51,7 +51,7 @@ var _ = Describe("MeshAccessLog", func() { resourceSet.Add(&r) } - context := xds_context.Context{ + xdsCtx := xds_context.Context{ Mesh: xds_context.MeshContext{ Resource: &core_mesh.MeshResource{ Meta: &test_model.ResourceMeta{ @@ -107,7 +107,7 @@ var _ = Describe("MeshAccessLog", func() { } plugin := plugin.NewPlugin().(core_plugins.PolicyPlugin) - Expect(plugin.Apply(resourceSet, context, &proxy)).To(Succeed()) + Expect(plugin.Apply(resourceSet, xdsCtx, &proxy)).To(Succeed()) policies_xds.ResourceArrayShouldEqual(resourceSet.ListOf(envoy_resource.ListenerType), given.expectedListeners) policies_xds.ResourceArrayShouldEqual(resourceSet.ListOf(envoy_resource.ClusterType), given.expectedClusters) }, @@ -1014,6 +1014,10 @@ var _ = Describe("MeshAccessLog", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") diff --git a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go index 424cc73ecd2c..4ac7a27c5069 100644 --- a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -14,7 +12,7 @@ import ( policies_xds "github.com/kumahq/kuma/pkg/plugins/policies/core/xds" api "github.com/kumahq/kuma/pkg/plugins/policies/meshcircuitbreaker/api/v1alpha1" plugin_xds "github.com/kumahq/kuma/pkg/plugins/policies/meshcircuitbreaker/plugin/xds" - gateway_plugin "github.com/kumahq/kuma/pkg/plugins/runtime/gateway" + gateway "github.com/kumahq/kuma/pkg/plugins/runtime/gateway" xds_context "github.com/kumahq/kuma/pkg/xds/context" envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names" ) @@ -54,7 +52,7 @@ func (p plugin) Apply( return err } - if err := applyToGateways(ctx, policies.ToRules, clusters.Gateway, proxy); err != nil { + if err := applyToGateways(policies.ToRules, clusters.Gateway, proxy); err != nil { return err } @@ -110,22 +108,13 @@ func applyToOutbounds( } func applyToGateways( - ctx xds_context.Context, rules core_rules.ToRules, gatewayClusters map[string]*envoy_cluster.Cluster, proxy *core_xds.Proxy, ) error { - if !proxy.Dataplane.Spec.IsBuiltinGateway() { - return nil - } - gatewayListenerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - - for _, listenerInfo := range gatewayListenerInfos { + for _, listenerInfo := range gateway.ExtractGatewayListener(proxy) { for _, hostInfo := range listenerInfo.HostInfos { - destinations := gateway_plugin.RouteDestinationsMutable(hostInfo.Entries) + destinations := gateway.RouteDestinationsMutable(hostInfo.Entries) for _, dest := range destinations { clusterName, err := dest.Destination.DestinationClusterName(hostInfo.Host.Tags) if err != nil { diff --git a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin_test.go index b26e6a7fbaf6..152b366287f0 100644 --- a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin_test.go @@ -431,6 +431,10 @@ var _ = Describe("MeshCircuitBreaker", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go index e041fe789e30..12245576f64c 100644 --- a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -47,7 +45,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * return err } - if err := applyToGateways(ctx, policies.FromRules, listeners.Gateway, proxy); err != nil { + if err := applyToGateways(policies.FromRules, listeners.Gateway, proxy); err != nil { return err } return nil @@ -86,7 +84,6 @@ func applyToInbounds( } func applyToGateways( - ctx xds_context.Context, fromRules core_rules.FromRules, gatewayListeners map[core_rules.InboundListener]*envoy_listener.Listener, proxy *core_xds.Proxy, @@ -94,11 +91,7 @@ func applyToGateways( if !proxy.Dataplane.Spec.IsBuiltinGateway() { return nil } - gatewayListerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - for _, listenerInfo := range gatewayListerInfos { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { address := proxy.Dataplane.Spec.GetNetworking().Address port := listenerInfo.Listener.Port protocol := core_mesh.ParseProtocol(mesh_proto.MeshGateway_Listener_Protocol_name[int32(listenerInfo.Listener.Protocol)]) diff --git a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin_test.go index 213c4930e4c2..644796de5fcd 100644 --- a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin_test.go @@ -246,6 +246,10 @@ var _ = Describe("MeshFaultInjection", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gatewayGenerator() generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go index 993bd07e6b8f..0e4c7dde8903 100644 --- a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -42,7 +40,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * return err } - if err := applyToGateways(ctx, policies.ToRules, clusters.Gateway, proxy); err != nil { + if err := applyToGateways(policies.ToRules, clusters.Gateway, proxy); err != nil { return err } @@ -64,20 +62,11 @@ func applyToOutbounds(rules core_rules.ToRules, outboundClusters map[string]*env } func applyToGateways( - ctx xds_context.Context, rules core_rules.ToRules, gatewayClusters map[string]*envoy_cluster.Cluster, proxy *core_xds.Proxy, ) error { - if !proxy.Dataplane.Spec.IsBuiltinGateway() { - return nil - } - gatewayListenerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - - for _, listenerInfo := range gatewayListenerInfos { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { for _, hostInfo := range listenerInfo.HostInfos { destinations := gateway_plugin.RouteDestinationsMutable(hostInfo.Entries) for _, dest := range destinations { diff --git a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin_test.go index 620996f4b4d8..ad8c41e0a642 100644 --- a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin_test.go @@ -280,6 +280,10 @@ var _ = Describe("MeshHealthCheck", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go index 549853007181..b10f22f46771 100644 --- a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -56,7 +54,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * endpoints := policies_xds.GatherEndpoints(rs) routes := policies_xds.GatherRoutes(rs) - if err := p.configureGateway(ctx, proxy, policies.ToRules, listeners.Gateway, clusters.Gateway, routes.Gateway, endpoints); err != nil { + if err := p.configureGateway(proxy, policies.ToRules, listeners.Gateway, clusters.Gateway, routes.Gateway, endpoints); err != nil { return err } @@ -133,7 +131,6 @@ func configureEndpoints( } func (p plugin) configureGateway( - ctx xds_context.Context, proxy *core_xds.Proxy, rules core_rules.ToRules, gatewayListeners map[core_rules.InboundListener]*envoy_listener.Listener, @@ -141,15 +138,11 @@ func (p plugin) configureGateway( gatewayRoutes map[string]*envoy_route.RouteConfiguration, endpoints policies_xds.EndpointMap, ) error { - if !proxy.Dataplane.Spec.IsBuiltinGateway() { + gatewayListenerInfos := gateway_plugin.ExtractGatewayListener(proxy) + if len(gatewayListenerInfos) == 0 { return nil } - gatewayListenerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - conf := core_rules.ComputeConf[api.Conf](rules.Rules, core_rules.MeshSubset()) if conf == nil { return nil diff --git a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin_test.go index 74ac81702cb2..9a87b5d302c3 100644 --- a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin_test.go @@ -391,6 +391,10 @@ var _ = Describe("MeshLoadBalancingStrategy", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go index 2a34ccafd71f..6970c5fae807 100644 --- a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -46,27 +44,19 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * if err := applyToInbounds(policies.FromRules, listeners.Inbound, proxy); err != nil { return err } - if err := applyToGateways(ctx, policies.FromRules, listeners.Gateway, routes.Gateway, proxy); err != nil { + if err := applyToGateways(policies.FromRules, listeners.Gateway, routes.Gateway, proxy); err != nil { return err } return nil } func applyToGateways( - ctx xds_context.Context, fromRules core_rules.FromRules, gatewayListeners map[core_rules.InboundListener]*envoy_listener.Listener, gatewayRoutes map[string]*envoy_route.RouteConfiguration, proxy *core_xds.Proxy, ) error { - if !proxy.Dataplane.Spec.IsBuiltinGateway() { - return nil - } - gatewayListerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - for _, listenerInfo := range gatewayListerInfos { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { address := proxy.Dataplane.Spec.GetNetworking().Address port := listenerInfo.Listener.Port listenerKey := core_rules.InboundListener{ diff --git a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin_test.go index 3780c688ba54..4b5ab28dc9b1 100644 --- a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin_test.go @@ -466,8 +466,12 @@ var _ = Describe("MeshRateLimit", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, } - gatewayGenerator := gatewayGenerator() + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) + } + gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) Expect(err).NotTo(HaveOccurred()) @@ -480,19 +484,3 @@ var _ = Describe("MeshRateLimit", func() { Expect(util_proto.ToYAML(generatedResources.ListOf(envoy_resource.ListenerType)[0].Resource)).To(test_matchers.MatchGoldenYAML(filepath.Join("testdata", "gateway_basic_listener.golden.yaml"))) }) }) - -func gatewayGenerator() gateway_plugin.Generator { - return gateway_plugin.Generator{ - FilterChainGenerators: gateway_plugin.FilterChainGenerators{ - FilterChainGenerators: map[mesh_proto.MeshGateway_Listener_Protocol]gateway_plugin.FilterChainGenerator{ - mesh_proto.MeshGateway_Listener_HTTP: &gateway_plugin.HTTPFilterChainGenerator{}, - mesh_proto.MeshGateway_Listener_HTTPS: &gateway_plugin.HTTPSFilterChainGenerator{}, - mesh_proto.MeshGateway_Listener_TCP: &gateway_plugin.TCPFilterChainGenerator{}, - }, - }, - ClusterGenerator: gateway_plugin.ClusterGenerator{ - Zone: "test-zone", - }, - Zone: "test-zone", - } -} diff --git a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go index 63e271977edb..bc5d814ba228 100644 --- a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -44,7 +42,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * return err } - if err := applyToGateway(ctx, policies.ToRules, routes.Gateway, listeners.Gateway, proxy); err != nil { + if err := applyToGateway(policies.ToRules, routes.Gateway, listeners.Gateway, proxy); err != nil { return err } @@ -80,22 +78,12 @@ func applyToOutbounds( } func applyToGateway( - ctx xds_context.Context, rules core_rules.ToRules, gatewayRoutes map[string]*envoy_route.RouteConfiguration, gatewayListeners map[core_rules.InboundListener]*envoy_listener.Listener, proxy *core_xds.Proxy, ) error { - if !proxy.Dataplane.Spec.IsBuiltinGateway() { - return nil - } - - gatewayListerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - - for _, listenerInfo := range gatewayListerInfos { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { configurer := plugin_xds.Configurer{ Retry: core_rules.ComputeConf[api.Conf](rules.Rules, core_rules.MeshSubset()), Protocol: core_mesh.ParseProtocol(listenerInfo.Listener.Protocol.String()), diff --git a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin_test.go index c2f5848f99fa..8271489b5459 100644 --- a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin_test.go @@ -330,6 +330,10 @@ var _ = Describe("MeshRetry", func() { }, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go index b35098b1c359..77d9a3fe9560 100644 --- a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go @@ -1,8 +1,6 @@ package v1alpha1 import ( - "context" - envoy_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -53,7 +51,7 @@ func (p plugin) Apply(rs *core_xds.ResourceSet, ctx xds_context.Context, proxy * if err := applyToOutbounds(policies.ToRules, listeners.Outbound, proxy.Dataplane, proxy.Routing); err != nil { return err } - if err := applyToGateway(policies.ToRules, clusters.Gateway, routes.Gateway, ctx, proxy); err != nil { + if err := applyToGateway(policies.ToRules, clusters.Gateway, routes.Gateway, proxy); err != nil { return err } @@ -168,14 +166,9 @@ func applyToGateway( toRules core_rules.ToRules, gatewayClusters map[string]*envoy_cluster.Cluster, gatewayRoutes map[string]*envoy_route.RouteConfiguration, - ctx xds_context.Context, proxy *core_xds.Proxy, ) error { - gatewayListerInfos, err := gateway_plugin.GatewayListenerInfoFromProxy(context.TODO(), ctx.Mesh, proxy, ctx.ControlPlane.Zone) - if err != nil { - return err - } - for _, listenerInfo := range gatewayListerInfos { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { conf := getConf(toRules.Rules, core_rules.MeshSubset()) route, ok := gatewayRoutes[listenerInfo.Listener.ResourceName] diff --git a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go index 30f2ae9ae723..069e7a936370 100644 --- a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go @@ -460,6 +460,10 @@ var _ = Describe("MeshTimeout", func() { }}, }, }, + RuntimeExtensions: map[string]interface{}{}, + } + for n, p := range core_plugins.Plugins().ProxyPlugins() { + Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) } gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) diff --git a/pkg/plugins/runtime/gateway/generator.go b/pkg/plugins/runtime/gateway/generator.go index c0af93cd5656..26f492b75a44 100644 --- a/pkg/plugins/runtime/gateway/generator.go +++ b/pkg/plugins/runtime/gateway/generator.go @@ -105,8 +105,8 @@ func (g *FilterChainGenerators) For(ctx xds_context.Context, info GatewayListene // GatewayListenerInfoFromProxy processes a Dataplane and the corresponding // Gateway and returns information about the listeners, routes and applied // policies. -func GatewayListenerInfoFromProxy( - ctx context.Context, meshCtx xds_context.MeshContext, proxy *core_xds.Proxy, zone string, +func gatewayListenerInfoFromProxy( + ctx context.Context, meshCtx *xds_context.MeshContext, proxy *core_xds.Proxy, ) ( []GatewayListenerInfo, error, ) { @@ -164,7 +164,7 @@ func GatewayListenerInfoFromProxy( meshCtx.Resource, matchedExternalServices, meshCtx.DataSourceLoader, - zone, + proxy.Zone, ) for k, v := range esEndpoints { outboundEndpoints[k] = v @@ -172,7 +172,7 @@ func GatewayListenerInfoFromProxy( // We already validate that listeners are collapsible for _, listeners := range collapsed { - listener, hosts := MakeGatewayListener(meshCtx, gateway, proxy.Dataplane, listeners) + listener, hosts := MakeGatewayListener(meshCtx, gateway, listeners) var hostInfos []GatewayHostInfo for _, host := range hosts { @@ -203,14 +203,9 @@ func GatewayListenerInfoFromProxy( func (g Generator) Generate(ctx context.Context, xdsCtx xds_context.Context, proxy *core_xds.Proxy) (*core_xds.ResourceSet, error) { resources := core_xds.NewResourceSet() - listenerInfos, err := GatewayListenerInfoFromProxy(ctx, xdsCtx.Mesh, proxy, g.Zone) - if err != nil { - return nil, errors.Wrap(err, "error generating listener info from Proxy") - } - var limits []RuntimeResoureLimitListener - for _, info := range listenerInfos { + for _, info := range ExtractGatewayListener(proxy) { cdsResources, err := g.generateCDS(ctx, xdsCtx, info, info.HostInfos) if err != nil { return nil, err @@ -345,9 +340,8 @@ func (g Generator) generateRDS(ctx xds_context.Context, info GatewayListenerInfo // Listeners must be validated for collapsibility in terms of hostnames and // protocols. func MakeGatewayListener( - meshContext xds_context.MeshContext, + meshContext *xds_context.MeshContext, gateway *core_mesh.MeshGatewayResource, - dataplane *core_mesh.DataplaneResource, listeners []*mesh_proto.MeshGateway_Listener, ) (GatewayListener, []GatewayHost) { hostsByName := map[string]GatewayHost{} diff --git a/pkg/plugins/runtime/gateway/metadata/metadata.go b/pkg/plugins/runtime/gateway/metadata/metadata.go index 7448eabf757e..189fdea14924 100644 --- a/pkg/plugins/runtime/gateway/metadata/metadata.go +++ b/pkg/plugins/runtime/gateway/metadata/metadata.go @@ -1,10 +1,8 @@ package metadata -import core_plugins "github.com/kumahq/kuma/pkg/core/plugins" - // OriginGateway marks xDS resources generated by this plugin. const ( - OriginGateway = "gateway" - PluginName core_plugins.PluginName = "gateway" - ProfileGatewayProxy = "gateway-proxy" + OriginGateway = "gateway" + PluginName = "gateway" + ProfileGatewayProxy = "gateway-proxy" ) diff --git a/pkg/plugins/runtime/gateway/plugin.go b/pkg/plugins/runtime/gateway/plugin.go index 7d9f94552044..f4d433e4b8a1 100644 --- a/pkg/plugins/runtime/gateway/plugin.go +++ b/pkg/plugins/runtime/gateway/plugin.go @@ -1,12 +1,16 @@ package gateway import ( + "context" + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" config_core "github.com/kumahq/kuma/pkg/config/core" "github.com/kumahq/kuma/pkg/core" core_plugins "github.com/kumahq/kuma/pkg/core/plugins" + core_xds "github.com/kumahq/kuma/pkg/core/xds" mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1" "github.com/kumahq/kuma/pkg/plugins/runtime/gateway/metadata" + xds_context "github.com/kumahq/kuma/pkg/xds/context" "github.com/kumahq/kuma/pkg/xds/generator" generator_core "github.com/kumahq/kuma/pkg/xds/generator/core" generator_secrets "github.com/kumahq/kuma/pkg/xds/generator/secrets" @@ -34,6 +38,28 @@ func (p *plugin) BeforeBootstrap(context *core_plugins.MutablePluginContext, con return nil } +func (p *plugin) Apply(ctx context.Context, meshContext xds_context.MeshContext, proxy *core_xds.Proxy) error { + if proxy.Dataplane == nil || !proxy.Dataplane.Spec.IsBuiltinGateway() { + return nil + } + l, err := gatewayListenerInfoFromProxy( + ctx, &meshContext, proxy, + ) + if err != nil { + return err + } + proxy.RuntimeExtensions[metadata.PluginName] = l + return nil +} + +func ExtractGatewayListener(proxy *core_xds.Proxy) []GatewayListenerInfo { + ext := proxy.RuntimeExtensions[metadata.PluginName] + if ext == nil { + return nil + } + return ext.([]GatewayListenerInfo) +} + func (p *plugin) AfterBootstrap(context *core_plugins.MutablePluginContext, config core_plugins.PluginConfig) error { // Insert our resolver before the default so that we can intercept // builtin gateway dataplanes. diff --git a/pkg/plugins/runtime/gateway/suite_test.go b/pkg/plugins/runtime/gateway/suite_test.go index 4749bf69c7a1..bfd458f24212 100644 --- a/pkg/plugins/runtime/gateway/suite_test.go +++ b/pkg/plugins/runtime/gateway/suite_test.go @@ -331,10 +331,18 @@ var _ = BeforeSuite(func() { // Ensure that the plugin is registered so that tests at least // have a chance of working. Expect(plugins.Plugins().BootstrapPlugins()).To( - WithTransform(func(in []plugins.BootstrapPlugin) []plugins.PluginName { - var out []plugins.PluginName + WithTransform(func(in []plugins.BootstrapPlugin) []string { + var out []string for _, p := range in { - out = append(out, p.Name()) + out = append(out, string(p.Name())) + } + return out + }, ContainElement(metadata.PluginName))) + Expect(plugins.Plugins().ProxyPlugins()).To( + WithTransform(func(in map[plugins.PluginName]plugins.ProxyPlugin) []string { + var out []string + for k := range in { + out = append(out, string(k)) } return out }, ContainElement(metadata.PluginName))) diff --git a/pkg/xds/sync/dataplane_proxy_builder.go b/pkg/xds/sync/dataplane_proxy_builder.go index 6ff6c75c2e5f..ff8e13d98936 100644 --- a/pkg/xds/sync/dataplane_proxy_builder.go +++ b/pkg/xds/sync/dataplane_proxy_builder.go @@ -11,7 +11,7 @@ import ( "github.com/kumahq/kuma/pkg/core/logs" manager_dataplane "github.com/kumahq/kuma/pkg/core/managers/apis/dataplane" "github.com/kumahq/kuma/pkg/core/permissions" - "github.com/kumahq/kuma/pkg/core/plugins" + core_plugins "github.com/kumahq/kuma/pkg/core/plugins" "github.com/kumahq/kuma/pkg/core/ratelimits" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" core_model "github.com/kumahq/kuma/pkg/core/resources/model" @@ -56,13 +56,21 @@ func (p *DataplaneProxyBuilder) Build(ctx context.Context, key core_model.Resour secretsTracker := envoy.NewSecretsTracker(meshName, allMeshNames) proxy := &core_xds.Proxy{ - Id: core_xds.FromResourceKey(key), - APIVersion: p.APIVersion, - Dataplane: dp, - Routing: *routing, - Policies: *matchedPolicies, - SecretsTracker: secretsTracker, - Metadata: &core_xds.DataplaneMetadata{}, + Id: core_xds.FromResourceKey(key), + APIVersion: p.APIVersion, + Dataplane: dp, + Routing: *routing, + Policies: *matchedPolicies, + SecretsTracker: secretsTracker, + Metadata: &core_xds.DataplaneMetadata{}, + Zone: p.Zone, + RuntimeExtensions: map[string]interface{}{}, + } + for k, pl := range core_plugins.Plugins().ProxyPlugins() { + err := pl.Apply(ctx, meshContext, proxy) + if err != nil { + return nil, errors.Wrapf(err, "Failed applying proxy plugin: %s", k) + } } return proxy, nil } @@ -159,7 +167,7 @@ func (p *DataplaneProxyBuilder) matchPolicies(meshContext xds_context.MeshContex ProxyTemplate: template.SelectProxyTemplate(dataplane, resources.ProxyTemplates().Items), Dynamic: core_xds.PluginOriginatedPolicies{}, } - for name, p := range plugins.Plugins().PolicyPlugins() { + for name, p := range core_plugins.Plugins().PolicyPlugins() { res, err := p.MatchedPolicies(dataplane, resources) if err != nil { return nil, errors.Wrapf(err, "could not apply policy plugin %s", name) diff --git a/pkg/xds/sync/egress_proxy_builder.go b/pkg/xds/sync/egress_proxy_builder.go index 67bd20892230..e773f758fb6d 100644 --- a/pkg/xds/sync/egress_proxy_builder.go +++ b/pkg/xds/sync/egress_proxy_builder.go @@ -8,7 +8,7 @@ import ( "github.com/kumahq/kuma/pkg/core/faultinjections" "github.com/kumahq/kuma/pkg/core/permissions" - "github.com/kumahq/kuma/pkg/core/plugins" + core_plugins "github.com/kumahq/kuma/pkg/core/plugins" "github.com/kumahq/kuma/pkg/core/ratelimits" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" core_model "github.com/kumahq/kuma/pkg/core/resources/model" @@ -95,8 +95,8 @@ func (p *EgressProxyBuilder) Build( for _, es := range externalServices { policies := core_xds.PluginOriginatedPolicies{} - for name, plugin := range plugins.Plugins().PolicyPlugins() { - egressPlugin, ok := plugin.(plugins.EgressPolicyPlugin) + for name, plugin := range core_plugins.Plugins().PolicyPlugins() { + egressPlugin, ok := plugin.(core_plugins.EgressPolicyPlugin) if !ok { continue } @@ -118,12 +118,19 @@ func (p *EgressProxyBuilder) Build( proxy := &core_xds.Proxy{ Id: core_xds.FromResourceKey(key), APIVersion: p.apiVersion, + Zone: p.zone, ZoneEgressProxy: &core_xds.ZoneEgressProxy{ ZoneEgressResource: zoneEgress, ZoneIngresses: zoneIngresses, MeshResourcesList: meshResourcesList, }, } + for k, pl := range core_plugins.Plugins().ProxyPlugins() { + err := pl.Apply(ctx, xds_context.MeshContext{}, proxy) // No mesh context for zone proxies + if err != nil { + return nil, errors.Wrapf(err, "Failed applying proxy plugin: %s", k) + } + } return proxy, nil } diff --git a/pkg/xds/sync/ingress_proxy_builder.go b/pkg/xds/sync/ingress_proxy_builder.go index 67c45f3043b9..30a3082e7229 100644 --- a/pkg/xds/sync/ingress_proxy_builder.go +++ b/pkg/xds/sync/ingress_proxy_builder.go @@ -3,8 +3,11 @@ package sync import ( "context" + "github.com/pkg/errors" + mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core/dns/lookup" + core_plugins "github.com/kumahq/kuma/pkg/core/plugins" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" "github.com/kumahq/kuma/pkg/core/resources/manager" core_model "github.com/kumahq/kuma/pkg/core/resources/model" @@ -42,8 +45,15 @@ func (p *IngressProxyBuilder) Build( proxy := &core_xds.Proxy{ Id: core_xds.FromResourceKey(key), APIVersion: p.apiVersion, + Zone: p.zone, ZoneIngressProxy: p.buildZoneIngressProxy(zoneIngress, aggregatedMeshCtxs), } + for k, pl := range core_plugins.Plugins().ProxyPlugins() { + err := pl.Apply(ctx, xds_context.MeshContext{}, proxy) // No mesh context for zone proxies + if err != nil { + return nil, errors.Wrapf(err, "Failed applying proxy plugin: %s", k) + } + } return proxy, nil } From d9bc7bc9b5ba220fd30df01c8a14055c922ebefa Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Fri, 20 Oct 2023 16:22:39 +0200 Subject: [PATCH 2/2] review comments Signed-off-by: Charly Molter --- pkg/api-server/inspect_endpoints.go | 6 +-- .../apis/mesh/zz_generated.resources.go | 46 ++++++++++++++----- .../apis/system/zz_generated.resources.go | 16 +++++-- .../plugin/v1alpha1/plugin.go | 2 +- .../plugin/v1alpha1/plugin.go | 2 +- .../meshhealthcheck/plugin/v1alpha1/plugin.go | 2 +- .../plugin/v1alpha1/plugin.go | 2 +- .../meshratelimit/plugin/v1alpha1/plugin.go | 2 +- .../meshretry/plugin/v1alpha1/plugin.go | 2 +- .../meshtimeout/plugin/v1alpha1/plugin.go | 2 +- .../plugin/v1alpha1/plugin_test.go | 4 +- pkg/plugins/runtime/gateway/generator.go | 2 +- pkg/plugins/runtime/gateway/plugin.go | 13 ++++-- 13 files changed, 68 insertions(+), 33 deletions(-) diff --git a/pkg/api-server/inspect_endpoints.go b/pkg/api-server/inspect_endpoints.go index 8925cf0289b7..c36867a638d8 100644 --- a/pkg/api-server/inspect_endpoints.go +++ b/pkg/api-server/inspect_endpoints.go @@ -207,7 +207,7 @@ func inspectGatewayRouteDataplanes( rest_errors.HandleError(request.Request.Context(), response, err, "Could not generate listener info") return } - for _, listener := range gateway.ExtractGatewayListener(proxy) { + for _, listener := range gateway.ExtractGatewayListeners(proxy) { for _, host := range listener.HostInfos { for _, entry := range host.Entries { if entry.Route != gatewayRoute.GetMeta().GetName() { @@ -349,7 +349,7 @@ func newGatewayDataplaneInspectResponse( proxy *core_xds.Proxy, ) api_server_types.GatewayDataplaneInspectResult { result := api_server_types.NewGatewayDataplaneInspectResult() - gwListeners := gateway.ExtractGatewayListener(proxy) + gwListeners := gateway.ExtractGatewayListeners(proxy) if len(gwListeners) > 0 { result.Gateway = api_server_types.ResourceKeyEntry{ Mesh: gwListeners[0].Gateway.GetMeta().GetMesh(), @@ -442,7 +442,7 @@ func gatewayEntriesByPolicy( proxy *core_xds.Proxy, ) map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry { policyMap := map[inspect.PolicyKey][]api_server_types.PolicyInspectEntry{} - gwListeners := gateway.ExtractGatewayListener(proxy) + gwListeners := gateway.ExtractGatewayListeners(proxy) dpKey := core_model.MetaToResourceKey(proxy.Dataplane.GetMeta()) resourceKey := api_server_types.ResourceKeyEntry{ diff --git a/pkg/core/resources/apis/mesh/zz_generated.resources.go b/pkg/core/resources/apis/mesh/zz_generated.resources.go index d5cc3dfda006..480320524ad1 100644 --- a/pkg/core/resources/apis/mesh/zz_generated.resources.go +++ b/pkg/core/resources/apis/mesh/zz_generated.resources.go @@ -5,6 +5,7 @@ package mesh import ( + "errors" "fmt" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -398,10 +399,17 @@ func (t *DataplaneOverviewResource) Descriptor() model.ResourceTypeDescriptor { func (t *DataplaneOverviewResource) SetOverviewSpec(resource model.Resource, insight model.Resource) error { t.SetMeta(resource.GetMeta()) - return t.SetSpec(&mesh_proto.DataplaneOverview{ - Dataplane: resource.GetSpec().(*mesh_proto.Dataplane), - DataplaneInsight: insight.GetSpec().(*mesh_proto.DataplaneInsight), - }) + overview := &mesh_proto.DataplaneOverview{ + Dataplane: resource.GetSpec().(*mesh_proto.Dataplane), + } + if insight != nil { + ins, ok := insight.GetSpec().(*mesh_proto.DataplaneInsight) + if !ok { + return errors.New("failed to convert to insight type 'DataplaneInsight'") + } + overview.DataplaneInsight = ins + } + return t.SetSpec(overview) } var _ model.ResourceList = &DataplaneOverviewResourceList{} @@ -2770,10 +2778,17 @@ func (t *ZoneEgressOverviewResource) Descriptor() model.ResourceTypeDescriptor { func (t *ZoneEgressOverviewResource) SetOverviewSpec(resource model.Resource, insight model.Resource) error { t.SetMeta(resource.GetMeta()) - return t.SetSpec(&mesh_proto.ZoneEgressOverview{ - ZoneEgress: resource.GetSpec().(*mesh_proto.ZoneEgress), - ZoneEgressInsight: insight.GetSpec().(*mesh_proto.ZoneEgressInsight), - }) + overview := &mesh_proto.ZoneEgressOverview{ + ZoneEgress: resource.GetSpec().(*mesh_proto.ZoneEgress), + } + if insight != nil { + ins, ok := insight.GetSpec().(*mesh_proto.ZoneEgressInsight) + if !ok { + return errors.New("failed to convert to insight type 'ZoneEgressInsight'") + } + overview.ZoneEgressInsight = ins + } + return t.SetSpec(overview) } var _ model.ResourceList = &ZoneEgressOverviewResourceList{} @@ -3102,10 +3117,17 @@ func (t *ZoneIngressOverviewResource) Descriptor() model.ResourceTypeDescriptor func (t *ZoneIngressOverviewResource) SetOverviewSpec(resource model.Resource, insight model.Resource) error { t.SetMeta(resource.GetMeta()) - return t.SetSpec(&mesh_proto.ZoneIngressOverview{ - ZoneIngress: resource.GetSpec().(*mesh_proto.ZoneIngress), - ZoneIngressInsight: insight.GetSpec().(*mesh_proto.ZoneIngressInsight), - }) + overview := &mesh_proto.ZoneIngressOverview{ + ZoneIngress: resource.GetSpec().(*mesh_proto.ZoneIngress), + } + if insight != nil { + ins, ok := insight.GetSpec().(*mesh_proto.ZoneIngressInsight) + if !ok { + return errors.New("failed to convert to insight type 'ZoneIngressInsight'") + } + overview.ZoneIngressInsight = ins + } + return t.SetSpec(overview) } var _ model.ResourceList = &ZoneIngressOverviewResourceList{} diff --git a/pkg/core/resources/apis/system/zz_generated.resources.go b/pkg/core/resources/apis/system/zz_generated.resources.go index ee16e0d2db70..438eaffae9d2 100644 --- a/pkg/core/resources/apis/system/zz_generated.resources.go +++ b/pkg/core/resources/apis/system/zz_generated.resources.go @@ -5,6 +5,7 @@ package system import ( + "errors" "fmt" system_proto "github.com/kumahq/kuma/api/system/v1alpha1" @@ -497,10 +498,17 @@ func (t *ZoneOverviewResource) Descriptor() model.ResourceTypeDescriptor { func (t *ZoneOverviewResource) SetOverviewSpec(resource model.Resource, insight model.Resource) error { t.SetMeta(resource.GetMeta()) - return t.SetSpec(&system_proto.ZoneOverview{ - Zone: resource.GetSpec().(*system_proto.Zone), - ZoneInsight: insight.GetSpec().(*system_proto.ZoneInsight), - }) + overview := &system_proto.ZoneOverview{ + Zone: resource.GetSpec().(*system_proto.Zone), + } + if insight != nil { + ins, ok := insight.GetSpec().(*system_proto.ZoneInsight) + if !ok { + return errors.New("failed to convert to insight type 'ZoneInsight'") + } + overview.ZoneInsight = ins + } + return t.SetSpec(overview) } var _ model.ResourceList = &ZoneOverviewResourceList{} diff --git a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go index 4ac7a27c5069..e90c81296ea4 100644 --- a/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshcircuitbreaker/plugin/v1alpha1/plugin.go @@ -112,7 +112,7 @@ func applyToGateways( gatewayClusters map[string]*envoy_cluster.Cluster, proxy *core_xds.Proxy, ) error { - for _, listenerInfo := range gateway.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway.ExtractGatewayListeners(proxy) { for _, hostInfo := range listenerInfo.HostInfos { destinations := gateway.RouteDestinationsMutable(hostInfo.Entries) for _, dest := range destinations { diff --git a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go index 12245576f64c..9b4b5ebbf352 100644 --- a/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshfaultinjection/plugin/v1alpha1/plugin.go @@ -91,7 +91,7 @@ func applyToGateways( if !proxy.Dataplane.Spec.IsBuiltinGateway() { return nil } - for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListeners(proxy) { address := proxy.Dataplane.Spec.GetNetworking().Address port := listenerInfo.Listener.Port protocol := core_mesh.ParseProtocol(mesh_proto.MeshGateway_Listener_Protocol_name[int32(listenerInfo.Listener.Protocol)]) diff --git a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go index 0e4c7dde8903..8231e4a01f2f 100644 --- a/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshhealthcheck/plugin/v1alpha1/plugin.go @@ -66,7 +66,7 @@ func applyToGateways( gatewayClusters map[string]*envoy_cluster.Cluster, proxy *core_xds.Proxy, ) error { - for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListeners(proxy) { for _, hostInfo := range listenerInfo.HostInfos { destinations := gateway_plugin.RouteDestinationsMutable(hostInfo.Entries) for _, dest := range destinations { diff --git a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go index b10f22f46771..dfb11734f10c 100644 --- a/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshloadbalancingstrategy/plugin/v1alpha1/plugin.go @@ -138,7 +138,7 @@ func (p plugin) configureGateway( gatewayRoutes map[string]*envoy_route.RouteConfiguration, endpoints policies_xds.EndpointMap, ) error { - gatewayListenerInfos := gateway_plugin.ExtractGatewayListener(proxy) + gatewayListenerInfos := gateway_plugin.ExtractGatewayListeners(proxy) if len(gatewayListenerInfos) == 0 { return nil } diff --git a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go index 6970c5fae807..04644c7bb323 100644 --- a/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshratelimit/plugin/v1alpha1/plugin.go @@ -56,7 +56,7 @@ func applyToGateways( gatewayRoutes map[string]*envoy_route.RouteConfiguration, proxy *core_xds.Proxy, ) error { - for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListeners(proxy) { address := proxy.Dataplane.Spec.GetNetworking().Address port := listenerInfo.Listener.Port listenerKey := core_rules.InboundListener{ diff --git a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go index bc5d814ba228..df178e482835 100644 --- a/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshretry/plugin/v1alpha1/plugin.go @@ -83,7 +83,7 @@ func applyToGateway( gatewayListeners map[core_rules.InboundListener]*envoy_listener.Listener, proxy *core_xds.Proxy, ) error { - for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListeners(proxy) { configurer := plugin_xds.Configurer{ Retry: core_rules.ComputeConf[api.Conf](rules.Rules, core_rules.MeshSubset()), Protocol: core_mesh.ParseProtocol(listenerInfo.Listener.Protocol.String()), diff --git a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go index 77d9a3fe9560..1d0b142e7282 100644 --- a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go +++ b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin.go @@ -168,7 +168,7 @@ func applyToGateway( gatewayRoutes map[string]*envoy_route.RouteConfiguration, proxy *core_xds.Proxy, ) error { - for _, listenerInfo := range gateway_plugin.ExtractGatewayListener(proxy) { + for _, listenerInfo := range gateway_plugin.ExtractGatewayListeners(proxy) { conf := getConf(toRules.Rules, core_rules.MeshSubset()) route, ok := gatewayRoutes[listenerInfo.Listener.ResourceName] diff --git a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go index 069e7a936370..e855561af7b1 100644 --- a/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go +++ b/pkg/plugins/policies/meshtimeout/plugin/v1alpha1/plugin_test.go @@ -462,9 +462,7 @@ var _ = Describe("MeshTimeout", func() { }, RuntimeExtensions: map[string]interface{}{}, } - for n, p := range core_plugins.Plugins().ProxyPlugins() { - Expect(p.Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed(), n) - } + Expect(gateway_plugin.NewPlugin().(core_plugins.ProxyPlugin).Apply(context.Background(), xdsCtx.Mesh, &proxy)).To(Succeed()) gatewayGenerator := gateway_plugin.NewGenerator("test-zone") generatedResources, err := gatewayGenerator.Generate(context.Background(), xdsCtx, &proxy) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/plugins/runtime/gateway/generator.go b/pkg/plugins/runtime/gateway/generator.go index 26f492b75a44..a7a334db2bf0 100644 --- a/pkg/plugins/runtime/gateway/generator.go +++ b/pkg/plugins/runtime/gateway/generator.go @@ -205,7 +205,7 @@ func (g Generator) Generate(ctx context.Context, xdsCtx xds_context.Context, pro var limits []RuntimeResoureLimitListener - for _, info := range ExtractGatewayListener(proxy) { + for _, info := range ExtractGatewayListeners(proxy) { cdsResources, err := g.generateCDS(ctx, xdsCtx, info, info.HostInfos) if err != nil { return nil, err diff --git a/pkg/plugins/runtime/gateway/plugin.go b/pkg/plugins/runtime/gateway/plugin.go index f4d433e4b8a1..dfb7f3883dc8 100644 --- a/pkg/plugins/runtime/gateway/plugin.go +++ b/pkg/plugins/runtime/gateway/plugin.go @@ -18,14 +18,21 @@ import ( ) func init() { - core_plugins.Register(metadata.PluginName, &plugin{}) + core_plugins.Register(metadata.PluginName, NewPlugin()) } var log = core.Log.WithName("plugin").WithName("runtime").WithName("gateway") type plugin struct{} -var _ core_plugins.BootstrapPlugin = &plugin{} +var ( + _ core_plugins.BootstrapPlugin = &plugin{} + _ core_plugins.ProxyPlugin = &plugin{} +) + +func NewPlugin() core_plugins.Plugin { + return &plugin{} +} func (p *plugin) BeforeBootstrap(context *core_plugins.MutablePluginContext, config core_plugins.PluginConfig) error { if context.Config().Environment == config_core.KubernetesEnvironment { @@ -52,7 +59,7 @@ func (p *plugin) Apply(ctx context.Context, meshContext xds_context.MeshContext, return nil } -func ExtractGatewayListener(proxy *core_xds.Proxy) []GatewayListenerInfo { +func ExtractGatewayListeners(proxy *core_xds.Proxy) []GatewayListenerInfo { ext := proxy.RuntimeExtensions[metadata.PluginName] if ext == nil { return nil