Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(insights): refresh only changed #7737

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type resyncer struct {
idleTime prometheus.Summary
timeToProcessItem prometheus.Summary
itemProcessingTime prometheus.Summary

allPolicyTypes []model.ResourceType
}

// NewResyncer creates a new Component that periodically updates insights
Expand Down Expand Up @@ -95,6 +97,12 @@ func NewResyncer(config *Config) component.Component {
Objectives: core_metrics.DefaultObjectives,
})
config.Metrics.MustRegister(idleTime, timeToProcessItem, itemProcessingTime)

var allPolicyTypes []model.ResourceType
for _, desc := range config.Registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
allPolicyTypes = append(allPolicyTypes, desc.Name)
}

r := &resyncer{
rm: config.ResourceManager,
eventFactory: config.EventReaderFactory,
Expand All @@ -109,6 +117,7 @@ func NewResyncer(config *Config) component.Component {
idleTime: idleTime,
timeToProcessItem: timeToProcessItem,
itemProcessingTime: itemProcessingTime,
allPolicyTypes: allPolicyTypes,
}
if config.Now == nil {
r.now = time.Now
Expand All @@ -126,6 +135,7 @@ type resyncEvent struct {
tenantId string
time time.Time
flag actionFlag
types map[model.ResourceType]struct{}
}

type actionFlag uint8
Expand Down Expand Up @@ -156,16 +166,23 @@ func (e *eventBatch) flush(ctx context.Context, resyncEvents chan resyncEvent) e
}

// add adds an item to the batch, if an item with a similar key exists we simply merge the actionFlags.
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag) {
func (e *eventBatch) add(now time.Time, tenantId string, mesh string, actionFlag actionFlag, types []model.ResourceType) {
if actionFlag == 0x00 { // No action so no need to persist
return
}
key := tenantId + ":" + mesh
if elt := e.events[key]; elt != nil {
// If the item is already present just merge the actionFlag
elt.flag |= actionFlag
for _, typ := range types {
elt.types[typ] = struct{}{}
}
} else {
e.events[key] = &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag}
event := &resyncEvent{time: now, tenantId: tenantId, mesh: mesh, flag: actionFlag, types: map[model.ResourceType]struct{}{}}
for _, typ := range types {
event.types[typ] = struct{}{}
}
e.events[key] = event
}
}

Expand Down Expand Up @@ -209,7 +226,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}
if event.flag&FlagMesh == FlagMesh {
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews)
err := r.createOrUpdateMeshInsight(tenantCtx, event.mesh, dpOverviews, event.types)
if err != nil {
log.Error(err, "unable to resync MeshInsight", "event", event)
}
Expand Down Expand Up @@ -294,7 +311,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
f |= FlagService
}
if f != 0 {
batch.add(r.now(), resourceChanged.TenantID, meshName, f)
batch.add(r.now(), resourceChanged.TenantID, meshName, f, []model.ResourceType{resourceChanged.Type})
}
}
case <-stop:
Expand Down Expand Up @@ -326,7 +343,7 @@ func (r *resyncer) addMeshesToBatch(ctx context.Context, batch *eventBatch, tena
return
}
for _, mesh := range meshList.Items {
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService)
batch.add(time.Now(), tenantID, mesh.GetMeta().GetName(), FlagMesh|FlagService, r.allPolicyTypes)
}
}

Expand Down Expand Up @@ -432,7 +449,7 @@ func (r *resyncer) createOrUpdateServiceInsight(ctx context.Context, mesh string
return nil
}

func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource) error {
func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, dpOverviews []*core_mesh.DataplaneOverviewResource, types map[model.ResourceType]struct{}) error {
log := kuma_log.AddFieldsFromCtx(log, ctx, context.Background()).WithValues("mesh", mesh) // Add info
insight := &mesh_proto.MeshInsight{
Dataplanes: &mesh_proto.MeshInsight_DataplaneStat{},
Expand Down Expand Up @@ -515,23 +532,44 @@ func (r *resyncer) createOrUpdateMeshInsight(ctx context.Context, mesh string, d
External: uint32(len(externalServices.Items)),
}

for _, resDesc := range r.registry.ObjectDescriptors(model.HasScope(model.ScopeMesh), model.IsPolicy()) {
list := resDesc.NewList()

if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
oldInsight := resource.GetSpec().(*mesh_proto.MeshInsight)
for k, v := range oldInsight.Policies {
insight.Policies[k] = proto.Clone(v).(*mesh_proto.MeshInsight_PolicyStat)
}
if proto.Equal(oldInsight, &mesh_proto.MeshInsight{}) {
// insight was not yet computed, need to update all
for _, typ := range r.allPolicyTypes {
types[typ] = struct{}{}
}
}

if len(list.GetItems()) != 0 {
insight.Policies[string(resDesc.Name)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
for typ := range types {
desc, err := r.registry.DescriptorFor(typ)
if err != nil {
return err
}
if !desc.IsPolicy {
continue
}

list := desc.NewList()
if err := r.rm.List(ctx, list, store.ListByMesh(mesh)); err != nil {
return err
}

if len(list.GetItems()) != 0 {
insight.Policies[string(typ)] = &mesh_proto.MeshInsight_PolicyStat{
Total: uint32(len(list.GetItems())),
}
}
if len(list.GetItems()) == 0 {
delete(insight.Policies, string(typ))
}
}
}

key := MeshInsightKey(mesh)
err := manager.Upsert(ctx, r.rm, key, core_mesh.NewMeshInsightResource(), func(resource model.Resource) error {
if resource.GetSpec() != nil && proto.Equal(resource.GetSpec().(proto.Message), insight) {
if proto.Equal(resource.GetSpec().(proto.Message), insight) {
log.V(1).Info("no need to update MeshInsight because the resource is the same")
return manager.ErrSkipUpsert
}
Expand Down