Skip to content

Commit

Permalink
feat(kds): compact subscriptions in insights (#7962)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <[email protected]>
  • Loading branch information
jakubdyszkiewicz authored Oct 4, 2023
1 parent d4e70cc commit 4452908
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 10 deletions.
11 changes: 11 additions & 0 deletions api/system/v1alpha1/zone_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error {
return nil
}

// CompactFinished removes detailed information about finished subscriptions to trim the object size
// The last subscription always has details.
func (x *ZoneInsight) CompactFinished() {
for i := 0; i < len(x.GetSubscriptions())-1; i++ {
x.Subscriptions[i].Config = ""
if status := x.Subscriptions[i].Status; status != nil {
status.Stat = map[string]*KDSServiceStats{}
}
}
}

// If Global CP was killed ungracefully then we can get a subscription without a DisconnectTime.
// Because of the way we process subscriptions the lack of DisconnectTime on old subscription
// will cause wrong status.
Expand Down
58 changes: 56 additions & 2 deletions api/system/v1alpha1/zone_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

var _ = Describe("Zone Insights", func() {
Context("UpdateSubscription", func() {
t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00")
t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00")

Context("UpdateSubscription", func() {
It("should leave subscriptions in a valid state", func() {
// given
zoneInsight := &system_proto.ZoneInsight{
Expand Down Expand Up @@ -66,4 +66,58 @@ var _ = Describe("Zone Insights", func() {
Expect(err.Error()).To(Equal("invalid type *v1alpha1.DiscoverySubscription for ZoneInsight"))
})
})

It("should compact finished subscriptions", func() {
// given
zoneInsight := &system_proto.ZoneInsight{
Subscriptions: []*system_proto.KDSSubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
Config: "a",
Status: &system_proto.KDSSubscriptionStatus{
LastUpdateTime: util_proto.MustTimestampProto(t1),
Total: &system_proto.KDSServiceStats{
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
Stat: map[string]*system_proto.KDSServiceStats{
"TrafficRoute": {
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
},
},
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
Config: "b",
Status: &system_proto.KDSSubscriptionStatus{
LastUpdateTime: util_proto.MustTimestampProto(t1),
Total: &system_proto.KDSServiceStats{
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
Stat: map[string]*system_proto.KDSServiceStats{
"TrafficRoute": {
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
},
},
},
},
}

// when
zoneInsight.CompactFinished()

// then
Expect(zoneInsight.Subscriptions[0].Config).To(Equal(""))
Expect(zoneInsight.Subscriptions[0].Status.Stat).To(BeEmpty())
Expect(zoneInsight.Subscriptions[1].Config).To(Equal("b"))
Expect(zoneInsight.Subscriptions[1].Status.Stat).NotTo(BeEmpty())
})
})
2 changes: 2 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (d *DataplaneMetrics) Validate() error {
type ZoneMetrics struct {
SubscriptionLimit int `json:"subscriptionLimit" envconfig:"kuma_metrics_zone_subscription_limit"`
IdleTimeout config_types.Duration `json:"idleTimeout" envconfig:"kuma_metrics_zone_idle_timeout"`
// CompactFinishedSubscriptions compacts finished metrics (do not store config and details of KDS exchange).
CompactFinishedSubscriptions bool `json:"compactFinishedSubscriptions" envconfig:"kuma_metrics_zone_compact_finished_subscriptions"`
}

func (d *ZoneMetrics) Sanitize() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ var _ = Describe("Config loader", func() {

Expect(cfg.Metrics.Zone.SubscriptionLimit).To(Equal(23))
Expect(cfg.Metrics.Zone.IdleTimeout.Duration).To(Equal(2 * time.Minute))
Expect(cfg.Metrics.Zone.CompactFinishedSubscriptions).To(BeTrue())
Expect(cfg.Metrics.Mesh.MinResyncInterval.Duration).To(Equal(27 * time.Second))
Expect(cfg.Metrics.Mesh.FullResyncInterval.Duration).To(Equal(35 * time.Second))
Expect(cfg.Metrics.Mesh.BufferSize).To(Equal(23))
Expand Down Expand Up @@ -598,6 +599,7 @@ metrics:
zone:
subscriptionLimit: 23
idleTimeout: 2m
compactFinishedSubscriptions: true
mesh:
fullResyncInterval: 35s
minResyncInterval: 27s
Expand Down Expand Up @@ -883,6 +885,7 @@ tracing:
"KUMA_XDS_SERVER_NACK_BACKOFF": "10s",
"KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT": "23",
"KUMA_METRICS_ZONE_IDLE_TIMEOUT": "2m",
"KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS": "true",
"KUMA_METRICS_MESH_MIN_RESYNC_TIMEOUT": "27s",
"KUMA_METRICS_MESH_MAX_RESYNC_TIMEOUT": "35s",
"KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL": "27s",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
},
rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10,
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert),
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions),
l,
rt.Extensions(),
)
Expand Down
24 changes: 18 additions & 6 deletions pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,24 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) {
}
}

func NewZonesInsightStore(resManager manager.ResourceManager, upsertCfg config_store.UpsertConfig) ZoneInsightStore {
func NewZonesInsightStore(
resManager manager.ResourceManager,
upsertCfg config_store.UpsertConfig,
compactFinished bool,
) ZoneInsightStore {
return &zoneInsightStore{
resManager: resManager,
upsertCfg: upsertCfg,
resManager: resManager,
upsertCfg: upsertCfg,
compactFinished: compactFinished,
}
}

var _ ZoneInsightStore = &zoneInsightStore{}

type zoneInsightStore struct {
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
compactFinished bool
}

func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription *system_proto.KDSSubscription) error {
Expand All @@ -129,6 +135,12 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription
}
zoneInsight := system.NewZoneInsightResource()
return manager.Upsert(ctx, s.resManager, key, zoneInsight, func(resource core_model.Resource) error {
return zoneInsight.Spec.UpdateSubscription(subscription)
if err := zoneInsight.Spec.UpdateSubscription(subscription); err != nil {
return err
}
if s.compactFinished {
zoneInsight.Spec.CompactFinished()
}
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration)
}, func() *time.Ticker {
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert), l, rt.Extensions())
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions())
}, log)
}

Expand Down

0 comments on commit 4452908

Please sign in to comment.