Skip to content

Commit

Permalink
perf(insights): refresh only changed
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz committed Sep 12, 2023
1 parent 66ec984 commit a2ff9ef
Showing 1 changed file with 51 additions and 18 deletions.
69 changes: 51 additions & 18 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type resyncEvent struct {
tenantId string
time time.Time
flag actionFlag
types map[model.ResourceType]struct{}
}

type actionFlag uint8
Expand Down Expand Up @@ -156,16 +157,23 @@ func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent) e
}

// add adds an item to the batch, if an item with a similar key exists we simply merge the actionFlags.
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag) {
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag, types []model.ResourceType) {
if actionFlag == 0x00 { // No action so no need to persist
return
}
key := tenantId + ":" + mesh
if elt := e.events[key]; elt != nil {
// If the item is already present just merge the actionFlag
elt.flag |= actionFlag
for _, typ := range types {
elt.types[typ] = struct{}{}
}
} else {
e.events[key] = &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag}
event := &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag, types: map[model.ResourceType]struct{}{}}
for _, typ := range types {
event.types[typ] = struct{}{}
}
e.events[key] = event
}
}

Expand Down Expand Up @@ -209,7 +217,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}
if event.flag&FlagMesh == FlagMesh {
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews)
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types)
if err != nil {
log.Error(err, "unable to resync MeshInsight", "event", event)
}
Expand Down Expand Up @@ -294,7 +302,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
f |= FlagService
}
if f != 0 {
batch.add(r.now(), resourceChanged.TenantID, meshName, f)
batch.add(r.now(), resourceChanged.TenantID, meshName, f, []model.ResourceType{resourceChanged.Type})
}
}
case <-stop:
Expand Down Expand Up @@ -326,7 +334,7 @@ func (r *resyncer) addMeshesToBatch(ctx context.Context, batch *eventBatch, tena
return
}
for _, mesh := range meshList.Items {
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService)
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService, r.allPolicyTypes())
}
}

Expand Down Expand Up @@ -432,7 +440,15 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
return nil
}

func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource) error {
func (r *resyncer) allPolicyTypes() []model.ResourceType {
var types []model.ResourceType
for _, desc := range r.registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
types = append(types, desc.Name)
}
return types
}

func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource, types map[model.ResourceType]struct{}) error {
log := kuma_log.AddFieldsFromCtx(log, ctx, context.Background()).WithValues("mesh", mesh) // Add info
insight := &mesh_proto.MeshInsight{
Dataplanes: &mesh_proto.MeshInsight_DataplaneStat{},
Expand Down Expand Up @@ -515,23 +531,40 @@ func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, d
External: uint32(len(externalServices.Items)),
}

for _, resDesc := range r.registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
list := resDesc.NewList()

if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
oldInsight := resource.GetSpec().(*mesh_proto.MeshInsight)
for k, v := range oldInsight.Policies {
insight.Policies[k] = proto.Clone(v).(*mesh_proto.MeshInsight_PolicyStat)
}
if len(oldInsight.Policies) == 0 {
for _, typ := range r.allPolicyTypes() {
types[typ] = struct{}{}
}
}

if len(list.GetItems()) != 0 {
insight.Policies[string(resDesc.Name)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
for typ := range types {
desc, err := r.registry.DescriptorFor(typ)
if err != nil {
return err
}
if !desc.IsPolicy {
continue
}

list := desc.NewList()
if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
}

if len(list.GetItems()) != 0 {
insight.Policies[string(typ)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
}
}
}
}

key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
if resource.GetSpec() != nil && proto.Equal(resource.GetSpec().(proto.Message), insight) {
if proto.Equal(resource.GetSpec().(proto.Message), insight) {
log.V(1).Info("no need to update MeshInsight because the resource is the same")
return manager.ErrSkipUpsert
}
Expand Down

0 comments on commit a2ff9ef

Please sign in to comment.