Skip to content

Commit

Permalink
perf(insights): refresh only changed (#7737)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz authored Sep 12, 2023
1 parent ace9c13 commit 5d10560
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type resyncer struct {
idleTime prometheus.Summary
timeToProcessItem prometheus.Summary
itemProcessingTime prometheus.Summary

allPolicyTypes []model.ResourceType
}

// NewResyncer creates a new Component that periodically updates insights
Expand Down Expand Up @@ -95,6 +97,12 @@ func NewResyncer(config *Config) component.Component {
Objectives: core_metrics.DefaultObjectives,
})
config.Metrics.MustRegister(idleTime, timeToProcessItem, itemProcessingTime)

var allPolicyTypes []model.ResourceType
for _, desc := range config.Registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
allPolicyTypes = append(allPolicyTypes, desc.Name)
}

r := &resyncer{
rm: config.ResourceManager,
eventFactory: config.EventReaderFactory,
Expand All @@ -109,6 +117,7 @@ func NewResyncer(config *Config) component.Component {
idleTime: idleTime,
timeToProcessItem: timeToProcessItem,
itemProcessingTime: itemProcessingTime,
allPolicyTypes: allPolicyTypes,
}
if config.Now == nil {
r.now = time.Now
Expand All @@ -126,6 +135,7 @@ type resyncEvent struct {
tenantId string
time time.Time
flag actionFlag
types map[model.ResourceType]struct{}
}

type actionFlag uint8
Expand Down Expand Up @@ -156,16 +166,23 @@ func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent) e
}

// add adds an item to the batch, if an item with a similar key exists we simply merge the actionFlags.
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag) {
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag, types []model.ResourceType) {
if actionFlag == 0x00 { // No action so no need to persist
return
}
key := tenantId + ":" + mesh
if elt := e.events[key]; elt != nil {
// If the item is already present just merge the actionFlag
elt.flag |= actionFlag
for _, typ := range types {
elt.types[typ] = struct{}{}
}
} else {
e.events[key] = &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag}
event := &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag, types: map[model.ResourceType]struct{}{}}
for _, typ := range types {
event.types[typ] = struct{}{}
}
e.events[key] = event
}
}

Expand Down Expand Up @@ -209,7 +226,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}
if event.flag&FlagMesh == FlagMesh {
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews)
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types)
if err != nil {
log.Error(err, "unable to resync MeshInsight", "event", event)
}
Expand Down Expand Up @@ -294,7 +311,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
f |= FlagService
}
if f != 0 {
batch.add(r.now(), resourceChanged.TenantID, meshName, f)
batch.add(r.now(), resourceChanged.TenantID, meshName, f, []model.ResourceType{resourceChanged.Type})
}
}
case <-stop:
Expand Down Expand Up @@ -326,7 +343,7 @@ func (r *resyncer) addMeshesToBatch(ctx context.Context, batch *eventBatch, tena
return
}
for _, mesh := range meshList.Items {
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService)
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService, r.allPolicyTypes)
}
}

Expand Down Expand Up @@ -432,7 +449,7 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
return nil
}

func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource) error {
func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource, types map[model.ResourceType]struct{}) error {
log := kuma_log.AddFieldsFromCtx(log, ctx, context.Background()).WithValues("mesh", mesh) // Add info
insight := &mesh_proto.MeshInsight{
Dataplanes: &mesh_proto.MeshInsight_DataplaneStat{},
Expand Down Expand Up @@ -515,23 +532,44 @@ func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, d
External: uint32(len(externalServices.Items)),
}

for _, resDesc := range r.registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
list := resDesc.NewList()

if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
oldInsight := resource.GetSpec().(*mesh_proto.MeshInsight)
for k, v := range oldInsight.Policies {
insight.Policies[k] = proto.Clone(v).(*mesh_proto.MeshInsight_PolicyStat)
}
if proto.Equal(oldInsight, &mesh_proto.MeshInsight{}) {
// insight was not yet computed, need to update all
for _, typ := range r.allPolicyTypes {
types[typ] = struct{}{}
}
}

if len(list.GetItems()) != 0 {
insight.Policies[string(resDesc.Name)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
for typ := range types {
desc, err := r.registry.DescriptorFor(typ)
if err != nil {
return err
}
if !desc.IsPolicy {
continue
}

list := desc.NewList()
if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
}

if len(list.GetItems()) != 0 {
insight.Policies[string(typ)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
}
}
if len(list.GetItems()) == 0 {
delete(insight.Policies, string(typ))
}
}
}

key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
if resource.GetSpec() != nil && proto.Equal(resource.GetSpec().(proto.Message), insight) {
if proto.Equal(resource.GetSpec().(proto.Message), insight) {
log.V(1).Info("no need to update MeshInsight because the resource is the same")
return manager.ErrSkipUpsert
}
Expand Down

0 comments on commit 5d10560

Please sign in to comment.