From 4452908658758769a7e84db19b2e40b2c584fc28 Mon Sep 17 00:00:00 2001 From: Jakub Dyszkiewicz Date: Wed, 4 Oct 2023 17:50:07 +0200 Subject: [PATCH] feat(kds): compact subscriptions in insights (#7962) Signed-off-by: Jakub Dyszkiewicz --- api/system/v1alpha1/zone_insight_helpers.go | 11 ++++ .../v1alpha1/zone_insight_helpers_test.go | 58 ++++++++++++++++++- docs/generated/kuma-cp.md | 2 + docs/generated/raw/kuma-cp.yaml | 2 + pkg/config/app/kuma-cp/config.go | 2 + pkg/config/app/kuma-cp/kuma-cp.defaults.yaml | 2 + pkg/config/loader_test.go | 3 + pkg/kds/server/components.go | 2 +- pkg/kds/server/status_sink.go | 24 ++++++-- pkg/kds/v2/server/components.go | 2 +- 10 files changed, 98 insertions(+), 10 deletions(-) diff --git a/api/system/v1alpha1/zone_insight_helpers.go b/api/system/v1alpha1/zone_insight_helpers.go index e7f39c1b7479..58103926a83d 100644 --- a/api/system/v1alpha1/zone_insight_helpers.go +++ b/api/system/v1alpha1/zone_insight_helpers.go @@ -74,6 +74,17 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error { return nil } +// CompactFinished removes detailed information about finished subscriptions to trim the object size +// The last subscription always has details. +func (x *ZoneInsight) CompactFinished() { + for i := 0; i < len(x.GetSubscriptions())-1; i++ { + x.Subscriptions[i].Config = "" + if status := x.Subscriptions[i].Status; status != nil { + status.Stat = map[string]*KDSServiceStats{} + } + } +} + // If Global CP was killed ungracefully then we can get a subscription without a DisconnectTime. // Because of the way we process subscriptions the lack of DisconnectTime on old subscription // will cause wrong status. diff --git a/api/system/v1alpha1/zone_insight_helpers_test.go b/api/system/v1alpha1/zone_insight_helpers_test.go index 89cba72e2e41..c91cd89097c0 100644 --- a/api/system/v1alpha1/zone_insight_helpers_test.go +++ b/api/system/v1alpha1/zone_insight_helpers_test.go @@ -12,9 +12,9 @@ import ( ) var _ = Describe("Zone Insights", func() { - Context("UpdateSubscription", func() { - t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00") + t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00") + Context("UpdateSubscription", func() { It("should leave subscriptions in a valid state", func() { // given zoneInsight := &system_proto.ZoneInsight{ @@ -66,4 +66,58 @@ var _ = Describe("Zone Insights", func() { Expect(err.Error()).To(Equal("invalid type *v1alpha1.DiscoverySubscription for ZoneInsight")) }) }) + + It("should compact finished subscriptions", func() { + // given + zoneInsight := &system_proto.ZoneInsight{ + Subscriptions: []*system_proto.KDSSubscription{ + { + Id: "1", + ConnectTime: util_proto.MustTimestampProto(t1), + DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)), + Config: "a", + Status: &system_proto.KDSSubscriptionStatus{ + LastUpdateTime: util_proto.MustTimestampProto(t1), + Total: &system_proto.KDSServiceStats{ + ResponsesSent: 1, + ResponsesAcknowledged: 1, + }, + Stat: map[string]*system_proto.KDSServiceStats{ + "TrafficRoute": { + ResponsesSent: 1, + ResponsesAcknowledged: 1, + }, + }, + }, + }, + { + Id: "2", + ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)), + Config: "b", + Status: &system_proto.KDSSubscriptionStatus{ + LastUpdateTime: util_proto.MustTimestampProto(t1), + Total: &system_proto.KDSServiceStats{ + ResponsesSent: 1, + ResponsesAcknowledged: 1, + }, + Stat: map[string]*system_proto.KDSServiceStats{ + "TrafficRoute": { + ResponsesSent: 1, + ResponsesAcknowledged: 1, + }, + }, + }, + }, + }, + } + + // when + zoneInsight.CompactFinished() + + // then + Expect(zoneInsight.Subscriptions[0].Config).To(Equal("")) + Expect(zoneInsight.Subscriptions[0].Status.Stat).To(BeEmpty()) + Expect(zoneInsight.Subscriptions[1].Config).To(Equal("b")) + Expect(zoneInsight.Subscriptions[1].Status.Stat).NotTo(BeEmpty()) + }) }) diff --git a/docs/generated/kuma-cp.md b/docs/generated/kuma-cp.md index b6ba26f2bb74..d6bc2d459bc0 100644 --- a/docs/generated/kuma-cp.md +++ b/docs/generated/kuma-cp.md @@ -414,6 +414,8 @@ metrics: subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT # How long zone can stay Online without active KDS connection idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT + # Compact finished metrics (do not store config and details of KDS exchange). + compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS mesh: # Minimum time between 2 refresh of insights minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL diff --git a/docs/generated/raw/kuma-cp.yaml b/docs/generated/raw/kuma-cp.yaml index b1a3d6e7abe3..54b7bd539358 100644 --- a/docs/generated/raw/kuma-cp.yaml +++ b/docs/generated/raw/kuma-cp.yaml @@ -411,6 +411,8 @@ metrics: subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT # How long zone can stay Online without active KDS connection idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT + # Compact finished metrics (do not store config and details of KDS exchange). + compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS mesh: # Minimum time between 2 refresh of insights minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL diff --git a/pkg/config/app/kuma-cp/config.go b/pkg/config/app/kuma-cp/config.go index 4250ea859f6b..4b7d736138fc 100644 --- a/pkg/config/app/kuma-cp/config.go +++ b/pkg/config/app/kuma-cp/config.go @@ -78,6 +78,8 @@ func (d *DataplaneMetrics) Validate() error { type ZoneMetrics struct { SubscriptionLimit int `json:"subscriptionLimit" envconfig:"kuma_metrics_zone_subscription_limit"` IdleTimeout config_types.Duration `json:"idleTimeout" envconfig:"kuma_metrics_zone_idle_timeout"` + // CompactFinishedSubscriptions compacts finished metrics (do not store config and details of KDS exchange). + CompactFinishedSubscriptions bool `json:"compactFinishedSubscriptions" envconfig:"kuma_metrics_zone_compact_finished_subscriptions"` } func (d *ZoneMetrics) Sanitize() { diff --git a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml index b1a3d6e7abe3..54b7bd539358 100644 --- a/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml +++ b/pkg/config/app/kuma-cp/kuma-cp.defaults.yaml @@ -411,6 +411,8 @@ metrics: subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT # How long zone can stay Online without active KDS connection idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT + # Compact finished metrics (do not store config and details of KDS exchange). + compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS mesh: # Minimum time between 2 refresh of insights minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 4eae04d78672..87b418ecd525 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -284,6 +284,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Metrics.Zone.SubscriptionLimit).To(Equal(23)) Expect(cfg.Metrics.Zone.IdleTimeout.Duration).To(Equal(2 * time.Minute)) + Expect(cfg.Metrics.Zone.CompactFinishedSubscriptions).To(BeTrue()) Expect(cfg.Metrics.Mesh.MinResyncInterval.Duration).To(Equal(27 * time.Second)) Expect(cfg.Metrics.Mesh.FullResyncInterval.Duration).To(Equal(35 * time.Second)) Expect(cfg.Metrics.Mesh.BufferSize).To(Equal(23)) @@ -598,6 +599,7 @@ metrics: zone: subscriptionLimit: 23 idleTimeout: 2m + compactFinishedSubscriptions: true mesh: fullResyncInterval: 35s minResyncInterval: 27s @@ -883,6 +885,7 @@ tracing: "KUMA_XDS_SERVER_NACK_BACKOFF": "10s", "KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT": "23", "KUMA_METRICS_ZONE_IDLE_TIMEOUT": "2m", + "KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS": "true", "KUMA_METRICS_MESH_MIN_RESYNC_TIMEOUT": "27s", "KUMA_METRICS_MESH_MAX_RESYNC_TIMEOUT": "35s", "KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL": "27s", diff --git a/pkg/kds/server/components.go b/pkg/kds/server/components.go index d81931c82813..7426ec62889f 100644 --- a/pkg/kds/server/components.go +++ b/pkg/kds/server/components.go @@ -68,7 +68,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2) }, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, - NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert), + NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions(), ) diff --git a/pkg/kds/server/status_sink.go b/pkg/kds/server/status_sink.go index e3afc382e789..cfa196754671 100644 --- a/pkg/kds/server/status_sink.go +++ b/pkg/kds/server/status_sink.go @@ -107,18 +107,24 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) { } } -func NewZonesInsightStore(resManager manager.ResourceManager, upsertCfg config_store.UpsertConfig) ZoneInsightStore { +func NewZonesInsightStore( + resManager manager.ResourceManager, + upsertCfg config_store.UpsertConfig, + compactFinished bool, +) ZoneInsightStore { return &zoneInsightStore{ - resManager: resManager, - upsertCfg: upsertCfg, + resManager: resManager, + upsertCfg: upsertCfg, + compactFinished: compactFinished, } } var _ ZoneInsightStore = &zoneInsightStore{} type zoneInsightStore struct { - resManager manager.ResourceManager - upsertCfg config_store.UpsertConfig + resManager manager.ResourceManager + upsertCfg config_store.UpsertConfig + compactFinished bool } func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription *system_proto.KDSSubscription) error { @@ -129,6 +135,12 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription } zoneInsight := system.NewZoneInsightResource() return manager.Upsert(ctx, s.resManager, key, zoneInsight, func(resource core_model.Resource) error { - return zoneInsight.Spec.UpdateSubscription(subscription) + if err := zoneInsight.Spec.UpdateSubscription(subscription); err != nil { + return err + } + if s.compactFinished { + zoneInsight.Spec.CompactFinished() + } + return nil }) } diff --git a/pkg/kds/v2/server/components.go b/pkg/kds/v2/server/components.go index 7e83373298c3..30976cb02ac5 100644 --- a/pkg/kds/v2/server/components.go +++ b/pkg/kds/v2/server/components.go @@ -79,7 +79,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke return time.NewTicker(rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration) }, func() *time.Ticker { return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2) - }, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert), l, rt.Extensions()) + }, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions()) }, log) }