Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(insights): multiple workers #7778

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ metrics:
fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL
# the size of the buffer between event creation and processing
bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE
# the number of workers that process metrics events
eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS
controlPlane:
# If true metrics show number of resources in the system should be reported
reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ metrics:
fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL
# the size of the buffer between event creation and processing
bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE
# the number of workers that process metrics events
eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS
controlPlane:
# If true metrics show number of resources in the system should be reported
reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type MeshMetrics struct {
MinResyncInterval config_types.Duration `json:"minResyncInterval" envconfig:"kuma_metrics_mesh_min_resync_interval"`
// FullResyncInterval time between triggering a full refresh of all the insights
FullResyncInterval config_types.Duration `json:"fullResyncInterval" envconfig:"kuma_metrics_mesh_full_resync_interval"`
// EventProcessors is a number of workers that process metrics events.
EventProcessors int `json:"eventProcessors" envconfig:"kuma_metrics_mesh_event_processors"`
}

type ControlPlaneMetrics struct {
Expand Down Expand Up @@ -215,6 +217,7 @@ var DefaultConfig = func() Config {
MinResyncInterval: config_types.Duration{Duration: 1 * time.Second},
FullResyncInterval: config_types.Duration{Duration: 20 * time.Second},
BufferSize: 1000,
EventProcessors: 1,
},
ControlPlane: &ControlPlaneMetrics{
ReportResourcesCount: true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ metrics:
fullResyncInterval: 20s # ENV: KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL
# the size of the buffer between event creation and processing
bufferSize: 1000 # ENV: KUMA_METRICS_MESH_BUFFER_SIZE
# the number of workers that process metrics events
eventProcessors: 1 # ENV: KUMA_METRICS_MESH_EVENT_PROCESSORS
controlPlane:
# If true metrics show number of resources in the system should be reported
reportResourcesCount: true # ENV: KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Metrics.Mesh.MinResyncInterval.Duration).To(Equal(27 * time.Second))
Expect(cfg.Metrics.Mesh.FullResyncInterval.Duration).To(Equal(35 * time.Second))
Expect(cfg.Metrics.Mesh.BufferSize).To(Equal(23))
Expect(cfg.Metrics.Mesh.EventProcessors).To(Equal(2))
Expect(cfg.Metrics.Dataplane.SubscriptionLimit).To(Equal(47))
Expect(cfg.Metrics.Dataplane.IdleTimeout.Duration).To(Equal(1 * time.Minute))
Expect(cfg.Metrics.ControlPlane.ReportResourcesCount).To(BeTrue())
Expand Down Expand Up @@ -598,6 +599,7 @@ metrics:
fullResyncInterval: 35s
minResyncInterval: 27s
bufferSize: 23
eventProcessors: 2
dataplane:
subscriptionLimit: 47
idleTimeout: 1m
Expand Down Expand Up @@ -877,6 +879,7 @@ eventBus:
"KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL": "27s",
"KUMA_METRICS_MESH_FULL_RESYNC_INTERVAL": "35s",
"KUMA_METRICS_MESH_BUFFER_SIZE": "23",
"KUMA_METRICS_MESH_EVENT_PROCESSORS": "2",
"KUMA_METRICS_DATAPLANE_SUBSCRIPTION_LIMIT": "47",
"KUMA_METRICS_DATAPLANE_IDLE_TIMEOUT": "1m",
"KUMA_METRICS_CONTROL_PLANE_REPORT_RESOURCES_COUNT": "true",
Expand Down
128 changes: 77 additions & 51 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -58,6 +59,7 @@ type Config struct {
Tick func(d time.Duration) <-chan time.Time
TenantFn multitenant.Tenants
EventBufferCapacity int
EventProcessors int
Metrics core_metrics.Metrics
Now func() time.Time
}
Expand All @@ -72,6 +74,7 @@ type resyncer struct {
registry registry.TypeRegistry
tenantFn multitenant.Tenants
eventBufferCapacity int
eventProcessors int
metrics core_metrics.Metrics
now func() time.Time

Expand Down Expand Up @@ -128,6 +131,7 @@ func NewResyncer(config *Config) component.Component {
timeToProcessItem: timeToProcessItem,
itemProcessingTime: itemProcessingTime,
allPolicyTypes: allPolicyTypes,
eventProcessors: config.EventProcessors,
}
if config.Now == nil {
r.now = time.Now
Expand Down Expand Up @@ -159,12 +163,22 @@ const (
// eventBatch keeps all the outstanding changes. The idea is that we linger an entire batch for some amount of time and we flush the batch all at once
type eventBatch struct {
events map[string]*resyncEvent
sync.Mutex
}

var flushAll = func(*resyncEvent) bool {
return true
}

// flush sends the current batch to the resyncEvents chan, if the context is cancelled we interrupt the sending but keep the items in the batch.
// if an item is successfully put in the chanel we remove it.
func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent) error {
func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent, predicate func(*resyncEvent) bool) error {
e.Lock()
defer e.Unlock()
for k, event := range e.events {
if !predicate(event) {
continue
}
select {
case <-ctx.Done():
return fmt.Errorf("context done and the batch wasn't complete, update will be delayed, outstanding events: %d", len(e.events))
Expand All @@ -188,6 +202,8 @@ func (e *eventBatch) add(
if actionFlag == 0x00 { // No action so no need to persist
return
}
e.Lock()
defer e.Unlock()
key := tenantId + ":" + mesh
if elt := e.events[key]; elt != nil {
// If the item is already present just merge the actionFlag
Expand Down Expand Up @@ -221,60 +237,62 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
cancel()
close(resyncEvents)
}()
go func() {
// We dequeue from the resyncEvents channel and actually do the insight update we want.
for {
start := r.now()
select {
case <-ctx.Done():
log.Info("stopped resyncEvents loop")
return
case event, more := <-resyncEvents:
if !more {
// Usually this shouldn't close if there's no closed context
continue
}
if event.flag == 0 {
continue
}
startProcessingTime := r.now()
r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds()))
r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds()))
tenantCtx := multitenant.WithTenant(ctx, event.tenantId)
dpOverviews, err := r.dpOverviews(tenantCtx, event.mesh)
if err != nil {
log.Error(err, "unable to get DataplaneOverviews to recompute insights", "event", event)
continue
}

anyChanged := false
if event.flag&FlagService == FlagService {
err, changed := r.createOrUpdateServiceInsight(tenantCtx, event.mesh, dpOverviews)
if err != nil {
log.Error(err, "unable to resync ServiceInsight", "event", event)
for i := 0; i < r.eventProcessors; i++ {
go func() {
// We dequeue from the resyncEvents channel and actually do the insight update we want.
for {
start := r.now()
select {
case <-ctx.Done():
log.Info("stopped resyncEvents loop")
return
case event, more := <-resyncEvents:
if !more {
// Usually this shouldn't close if there's no closed context
continue
}
if changed {
anyChanged = true
if event.flag == 0 {
continue
}
}
if event.flag&FlagMesh == FlagMesh {
err, changed := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types)
startProcessingTime := r.now()
r.idleTime.Observe(float64(startProcessingTime.Sub(start).Milliseconds()))
r.timeToProcessItem.Observe(float64(startProcessingTime.Sub(event.time).Milliseconds()))
tenantCtx := multitenant.WithTenant(ctx, event.tenantId)
dpOverviews, err := r.dpOverviews(tenantCtx, event.mesh)
if err != nil {
log.Error(err, "unable to resync MeshInsight", "event", event)
log.Error(err, "unable to get DataplaneOverviews to recompute insights", "event", event)
continue
}
if changed {
anyChanged = true

anyChanged := false
if event.flag&FlagService == FlagService {
err, changed := r.createOrUpdateServiceInsight(tenantCtx, event.mesh, dpOverviews)
if err != nil {
log.Error(err, "unable to resync ServiceInsight", "event", event)
}
if changed {
anyChanged = true
}
}
if event.flag&FlagMesh == FlagMesh {
err, changed := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types)
if err != nil {
log.Error(err, "unable to resync MeshInsight", "event", event)
}
if changed {
anyChanged = true
}
}
reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_")
result := ResultNoChanges
if anyChanged {
result = ResultChanged
}
r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds()))
}
reason := strings.Join(util_maps.SortedKeys(event.reasons), "_and_")
result := ResultNoChanges
if anyChanged {
result = ResultChanged
}
r.itemProcessingTime.WithLabelValues(reason, result).Observe(float64(time.Since(startProcessingTime).Milliseconds()))
}
}
}()
}()
}
eventReader := r.eventFactory.Subscribe(func(event events.Event) bool {
if _, ok := event.(events.TriggerInsightsComputationEvent); ok {
return true
Expand Down Expand Up @@ -308,12 +326,18 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
if err != nil {
log.Error(err, "could not get tenants")
}
wg := sync.WaitGroup{}
wg.Add(len(tenantIds))
for _, tenantId := range tenantIds {
r.addMeshesToBatch(tickCtx, batch, tenantId, ReasonResync)
go func(tenantId string) {
r.addMeshesToBatch(tickCtx, batch, tenantId, ReasonResync)
wg.Done()
}(tenantId)
}
wg.Wait()
}
// We flush the batch
if err := batch.flush(tickCtx, resyncEvents); err != nil {
if err := batch.flush(tickCtx, resyncEvents, flushAll); err != nil {
log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick")
}
cancelTimeout()
Expand All @@ -324,7 +348,9 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
if triggerEvent, ok := event.(events.TriggerInsightsComputationEvent); ok {
ctx := context.Background()
r.addMeshesToBatch(ctx, batch, triggerEvent.TenantID, ReasonForce)
if err := batch.flush(ctx, resyncEvents); err != nil {
if err := batch.flush(ctx, resyncEvents, func(event *resyncEvent) bool {
return event.tenantId == triggerEvent.TenantID
}); err != nil {
log.Error(err, "Flush of batch didn't complete, some insights won't be refreshed until next tick")
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/insights/resyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var _ = Describe("Insight Persistence", func() {
Registry: registry.Global(),
TenantFn: multitenant.SingleTenant,
EventBufferCapacity: 10,
EventProcessors: 10,
Metrics: metric,
})
go func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/store/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var _ = Describe("Counter", func() {
Registry: registry.Global(),
TenantFn: multitenant.SingleTenant,
EventBufferCapacity: 10,
EventProcessors: 1,
Metrics: metrics,
})

Expand Down