From 3b7cfdd392cc525d9c49cb4f3be951b9843441f2 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Mon, 18 Sep 2023 12:30:47 +0200 Subject: [PATCH 1/2] feat(insights): multiple workers Signed-off-by: Jakub Dyszkiewicz --- docs/generated/kuma-cp.md | 2 + docs/generated/raw/kuma-cp.yaml | 2 + pkg/config/app/kuma-cp/config.go | 3 + pkg/config/app/kuma-cp/kuma-cp.defaults.yaml | 2 + pkg/config/loader_test.go | 3 + pkg/insights/resyncer.go | 128 +++++++++++-------- pkg/insights/resyncer_test.go | 1 + pkg/metrics/store/counter_test.go | 1 + 8 files changed, 91 insertions(+), 51 deletions(-) diff --git a/docs/generated/kuma-cp.md b/docs/generated/kuma-cp.md index 7b290e7a8c50..5fb0b5db4bc2 100644 --- a/docs/generated/kuma-cp.md +++ b/docs/generated/kuma-cp.md @@ -419,6 +419,8 @@ metrics: fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL # the size of the buffer between event creation and processing bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE + # the number of workers that process metrics events + eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS controlPlane: # If true metrics show number of resources in the system should be reported reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT diff --git a/docs/generated/raw/kuma-cp.yaml b/docs/generated/raw/kuma-cp.yaml index de38434b10f3..99694020ac48 100644 --- a/docs/generated/raw/kuma-cp.yaml +++ b/docs/generated/raw/kuma-cp.yaml @@ -416,6 +416,8 @@ metrics: fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL # the size of the buffer between event creation and processing bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE + # the number of workers that process metrics events + eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS controlPlane: # If true metrics show number of resources in the system should be reported reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index d68590e108c8..f92501c3fc2e 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -101,6 +101,8 @@ type MeshMetrics struct { MinResyncInterval config_types.Duration `json:"minResyncInterval" envconfig:"kuma_metrics_mesh_min_resync_interval"` // FullResyncInterval time between triggering a full refresh of all the insights FullResyncInterval config_types.Duration `json:"fullResyncInterval" envconfig:"kuma_metrics_mesh_full_resync_interval"` + // EventProcessors is a number of workers that process metrics events. + EventProcessors int `json:"eventProcessors" envconfig:"kuma_metrics_mesh_event_processors"` } type ControlPlaneMetrics struct { @@ -215,6 +217,7 @@ var DefaultConfig = func() Config { MinResyncInterval: config_types.Duration{Duration: 1 * time.Second}, FullResyncInterval: config_types.Duration{Duration: 20 * time.Second}, BufferSize: 1000, + EventProcessors: 1, }, ControlPlane: &ControlPlaneMetrics{ ReportResourcesCount: true, diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index de38434b10f3..99694020ac48 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -416,6 +416,8 @@ metrics: fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL # the size of the buffer between event creation and processing bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE + # the number of workers that process metrics events + eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS controlPlane: # If true metrics show number of resources in the system should be reported reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 3d2b859bddb6..86f04a80aee1 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -286,6 +286,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Metrics.Mesh.MinResyncInterval.Duration).To(Equal(27 * time.Second)) Expect(cfg.Metrics.Mesh.FullResyncInterval.Duration).To(Equal(35 * time.Second)) Expect(cfg.Metrics.Mesh.BufferSize).To(Equal(23)) + Expect(cfg.Metrics.Mesh.EventProcessors).To(Equal(2)) Expect(cfg.Metrics.Dataplane.SubscriptionLimit).To(Equal(47)) Expect(cfg.Metrics.Dataplane.IdleTimeout.Duration).To(Equal(1 * time.Minute)) Expect(cfg.Metrics.ControlPlane.ReportResourcesCount).To(BeTrue()) @@ -598,6 +599,7 @@ metrics: fullResyncInterval: 35s minResyncInterval: 27s bufferSize: 23 + eventProcessors: 2 dataplane: subscriptionLimit: 47 idleTimeout: 1m @@ -877,6 +879,7 @@ eventBus: "KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL": "27s", "KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL": "35s", "KUMA_METRICS_MESH_BUFFER_SIZE": "23", + "KUMA_METRICS_MESH_EVENT_PROCESSORS": "2", "KUMA_METRICS_DATAPLANE_SUBSCRIPTION_LIMIT": "47", "KUMA_METRICS_DATAPLANE_IDLE_TIMEOUT": "1m", "KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT": "true", diff --git a/pkg/insights/resyncer.go b/pkg/insights/resyncer.go index ff0b46e4d80f..64b5d2d32461 100644 --- a/pkg/insights/resyncer.go +++ b/pkg/insights/resyncer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -58,6 +59,7 @@ type Config struct { Tick func(d time.Duration) <-chan time.Time TenantFn multitenant.Tenants EventBufferCapacity int + EventProcessors int Metrics core_metrics.Metrics Now func() time.Time } @@ -72,6 +74,7 @@ type resyncer struct { registry registry.TypeRegistry tenantFn multitenant.Tenants eventBufferCapacity int + eventProcessors int metrics core_metrics.Metrics now func() time.Time @@ -128,6 +131,7 @@ func NewResyncer(config *Config) component.Component { timeToProcessItem: timeToProcessItem, itemProcessingTime: itemProcessingTime, allPolicyTypes: allPolicyTypes, + eventProcessors: config.EventProcessors, } if config.Now == nil { r.now = time.Now @@ -159,12 +163,22 @@ const ( // eventBatch keeps all the outstanding changes. The idea is that we linger an entire batch for some amount of time and we flush the batch all at once type eventBatch struct { events map[string]*resyncEvent + sync.Mutex +} + +var flushAll = func(*resyncEvent) bool { + return true } // flush sends the current batch to the resyncEvents chan, if the context is cancelled we interrupt the sending but keep the items in the batch. // if an item is successfully put in the chanel we remove it. -func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent) error { +func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent, predicate func(*resyncEvent) bool) error { + e.Lock() + defer e.Unlock() for k, event := range e.events { + if !predicate(event) { + continue + } select { case <-ctx.Done(): return fmt.Errorf("context done and the batch wasn't complete, update will be delayed, outstanding events: %d", len(e.events)) @@ -188,6 +202,8 @@ func (e *eventBatch) add( if actionFlag == 0x00 { // No action so no need to persist return } + e.Lock() + defer e.Unlock() key := tenantId + ":" + mesh if elt := e.events[key]; elt != nil { // If the item is already present just merge the actionFlag @@ -221,60 +237,62 @@ func (r *resyncer) Start(stop <-chan struct{}) error { cancel() close(resyncEvents) }() - go func() { - // We dequeue from the resyncEvents channel and actually do the insight update we want. - for { - start := r.now() - select { - case <-ctx.Done(): - log.Info("stopped resyncEvents loop") - return - case event, more := <-resyncEvents: - if !more { - // Usually this shouldn't close if there's no closed context - continue - } - if event.flag == 0 { - continue - } - startProcessingTime := r.now() - r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds())) - r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds())) - tenantCtx := multitenant.WithTenant(ctx, event.tenantId) - dpOverviews, err := r.dpOverviews(tenantCtx, event.mesh) - if err != nil { - log.Error(err, "unable to get DataplaneOverviews to recompute insights", "event", event) - continue - } - - anyChanged := false - if event.flag&FlagService == FlagService { - err, changed := r.createOrUpdateServiceInsight(tenantCtx, event.mesh, dpOverviews) - if err != nil { - log.Error(err, "unable to resync ServiceInsight", "event", event) + for i := 0; i < r.eventProcessors; i++ { + go func() { + // We dequeue from the resyncEvents channel and actually do the insight update we want. + for { + start := r.now() + select { + case <-ctx.Done(): + log.Info("stopped resyncEvents loop") + return + case event, more := <-resyncEvents: + if !more { + // Usually this shouldn't close if there's no closed context + continue } - if changed { - anyChanged = true + if event.flag == 0 { + continue } - } - if event.flag&FlagMesh == FlagMesh { - err, changed := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types) + startProcessingTime := r.now() + r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds())) + r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds())) + tenantCtx := multitenant.WithTenant(ctx, event.tenantId) + dpOverviews, err := r.dpOverviews(tenantCtx, event.mesh) if err != nil { - log.Error(err, "unable to resync MeshInsight", "event", event) + log.Error(err, "unable to get DataplaneOverviews to recompute insights", "event", event) + continue } - if changed { - anyChanged = true + + anyChanged := false + if event.flag&FlagService == FlagService { + err, changed := r.createOrUpdateServiceInsight(tenantCtx, event.mesh, dpOverviews) + if err != nil { + log.Error(err, "unable to resync ServiceInsight", "event", event) + } + if changed { + anyChanged = true + } } + if event.flag&FlagMesh == FlagMesh { + err, changed := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types) + if err != nil { + log.Error(err, "unable to resync MeshInsight", "event", event) + } + if changed { + anyChanged = true + } + } + reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_") + result := ResultNoChanges + if anyChanged { + result = ResultChanged + } + r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds())) } - reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_") - result := ResultNoChanges - if anyChanged { - result = ResultChanged - } - r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds())) } - } - }() + }() + } eventReader := r.eventFactory.Subscribe(func(event events.Event) bool { if _, ok := event.(events.TriggerInsightsComputationEvent); ok { return true @@ -308,12 +326,18 @@ func (r *resyncer) Start(stop <-chan struct{}) error { if err != nil { log.Error(err, "could not get tenants") } + wg := sync.WaitGroup{} + wg.Add(len(tenantIds)) for _, tenantId := range tenantIds { - r.addMeshesToBatch(tickCtx, batch, tenantId, ReasonResync) + go func(tenantId string) { + r.addMeshesToBatch(tickCtx, batch, tenantId, ReasonResync) + wg.Done() + }(tenantId) } + wg.Wait() } // We flush the batch - if err := batch.flush(tickCtx, resyncEvents); err != nil { + if err := batch.flush(tickCtx, resyncEvents, flushAll); err != nil { log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick") } cancelTimeout() @@ -324,7 +348,9 @@ func (r *resyncer) Start(stop <-chan struct{}) error { if triggerEvent, ok := event.(events.TriggerInsightsComputationEvent); ok { ctx := context.Background() r.addMeshesToBatch(ctx, batch, triggerEvent.TenantID, ReasonForce) - if err := batch.flush(ctx, resyncEvents); err != nil { + if err := batch.flush(ctx, resyncEvents, func(event *resyncEvent) bool { + return event.tenantId == triggerEvent.TenantID + }); err != nil { log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick") } } diff --git a/pkg/insights/resyncer_test.go b/pkg/insights/resyncer_test.go index 74c7f151b92e..7d22a6c25424 100644 --- a/pkg/insights/resyncer_test.go +++ b/pkg/insights/resyncer_test.go @@ -71,6 +71,7 @@ var _ = Describe("Insight Persistence", func() { Registry: registry.Global(), TenantFn: multitenant.SingleTenant, EventBufferCapacity: 10, + EventProcessors: 10, Metrics: metric, }) go func() { diff --git a/pkg/metrics/store/counter_test.go b/pkg/metrics/store/counter_test.go index 2850afcc1b38..29d32ea585e5 100644 --- a/pkg/metrics/store/counter_test.go +++ b/pkg/metrics/store/counter_test.go @@ -67,6 +67,7 @@ var _ = Describe("Counter", func() { Registry: registry.Global(), TenantFn: multitenant.SingleTenant, EventBufferCapacity: 10, + EventProcessors: 1, Metrics: metrics, }) From 1c23dcd0fad1ba14f1e45b609a03a1d84f1cce2a Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Mon, 18 Sep 2023 13:10:50 +0200 Subject: [PATCH 2/2] wire config Signed-off-by: Jakub Dyszkiewicz --- pkg/insights/components.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/insights/components.go b/pkg/insights/components.go index a297fa37f52c..e9d5aba7bc18 100644 --- a/pkg/insights/components.go +++ b/pkg/insights/components.go @@ -27,6 +27,7 @@ func Setup(rt runtime.Runtime) error { Registry: registry.Global(), TenantFn: rt.Tenants(), EventBufferCapacity: rt.Config().Metrics.Mesh.BufferSize, + EventProcessors: rt.Config().Metrics.Mesh.EventProcessors, Metrics: rt.Metrics(), }) return rt.Add(component.NewResilientComponent(log, resyncer))