Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): compact subscriptions in insights #7962

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/system/v1alpha1/zone_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error {
return nil
}

// CompactFinished removes detailed information about finished subscriptions to trim the object size
// The last subscription always has details.
func (x *ZoneInsight) CompactFinished() {
for i := 0; i < len(x.GetSubscriptions())-1; i++ {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
x.Subscriptions[i].Config = ""
if status := x.Subscriptions[i].Status; status != nil {
status.Stat = map[string]*KDSServiceStats{}
}
}
}

// If Global CP was killed ungracefully then we can get a subscription without a DisconnectTime.
// Because of the way we process subscriptions the lack of DisconnectTime on old subscription
// will cause wrong status.
Expand Down
58 changes: 56 additions & 2 deletions api/system/v1alpha1/zone_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

var _ = Describe("Zone Insights", func() {
Context("UpdateSubscription", func() {
t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00")
t1, _ := time.Parse(time.RFC3339, "2018-07-17T16:05:36.995+00:00")

Context("UpdateSubscription", func() {
It("should leave subscriptions in a valid state", func() {
// given
zoneInsight := &system_proto.ZoneInsight{
Expand Down Expand Up @@ -66,4 +66,58 @@ var _ = Describe("Zone Insights", func() {
Expect(err.Error()).To(Equal("invalid type *v1alpha1.DiscoverySubscription for ZoneInsight"))
})
})

It("should compact finished subscriptions", func() {
// given
zoneInsight := &system_proto.ZoneInsight{
Subscriptions: []*system_proto.KDSSubscription{
{
Id: "1",
ConnectTime: util_proto.MustTimestampProto(t1),
DisconnectTime: util_proto.MustTimestampProto(t1.Add(1 * time.Hour)),
Config: "a",
Status: &system_proto.KDSSubscriptionStatus{
LastUpdateTime: util_proto.MustTimestampProto(t1),
Total: &system_proto.KDSServiceStats{
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
Stat: map[string]*system_proto.KDSServiceStats{
"TrafficRoute": {
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
},
},
},
{
Id: "2",
ConnectTime: util_proto.MustTimestampProto(t1.Add(2 * time.Hour)),
Config: "b",
Status: &system_proto.KDSSubscriptionStatus{
LastUpdateTime: util_proto.MustTimestampProto(t1),
Total: &system_proto.KDSServiceStats{
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
Stat: map[string]*system_proto.KDSServiceStats{
"TrafficRoute": {
ResponsesSent: 1,
ResponsesAcknowledged: 1,
},
},
},
},
},
}

// when
zoneInsight.CompactFinished()

// then
Expect(zoneInsight.Subscriptions[0].Config).To(Equal(""))
Expect(zoneInsight.Subscriptions[0].Status.Stat).To(BeEmpty())
Expect(zoneInsight.Subscriptions[1].Config).To(Equal("b"))
Expect(zoneInsight.Subscriptions[1].Status.Stat).NotTo(BeEmpty())
})
})
2 changes: 2 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (d *DataplaneMetrics) Validate() error {
type ZoneMetrics struct {
SubscriptionLimit int `json:"subscriptionLimit" envconfig:"kuma_metrics_zone_subscription_limit"`
IdleTimeout config_types.Duration `json:"idleTimeout" envconfig:"kuma_metrics_zone_idle_timeout"`
// CompactFinishedSubscriptions compacts finished metrics (do not store config and details of KDS exchange).
CompactFinishedSubscriptions bool `json:"compactFinishedSubscriptions" envconfig:"kuma_metrics_zone_compact_finished_subscriptions"`
}

func (d *ZoneMetrics) Sanitize() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ metrics:
subscriptionLimit: 10 # ENV: KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT
# How long zone can stay Online without active KDS connection
idleTimeout: 5m # ENV: KUMA_METRICS_ZONE_IDLE_TIMEOUT
# Compact finished metrics (do not store config and details of KDS exchange).
compactFinishedSubscriptions: false # ENV: KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS
mesh:
# Minimum time between 2 refresh of insights
minResyncInterval: 1s # ENV: KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ var _ = Describe("Config loader", func() {

Expect(cfg.Metrics.Zone.SubscriptionLimit).To(Equal(23))
Expect(cfg.Metrics.Zone.IdleTimeout.Duration).To(Equal(2 * time.Minute))
Expect(cfg.Metrics.Zone.CompactFinishedSubscriptions).To(BeTrue())
Expect(cfg.Metrics.Mesh.MinResyncInterval.Duration).To(Equal(27 * time.Second))
Expect(cfg.Metrics.Mesh.FullResyncInterval.Duration).To(Equal(35 * time.Second))
Expect(cfg.Metrics.Mesh.BufferSize).To(Equal(23))
Expand Down Expand Up @@ -598,6 +599,7 @@ metrics:
zone:
subscriptionLimit: 23
idleTimeout: 2m
compactFinishedSubscriptions: true
mesh:
fullResyncInterval: 35s
minResyncInterval: 27s
Expand Down Expand Up @@ -883,6 +885,7 @@ tracing:
"KUMA_XDS_SERVER_NACK_BACKOFF": "10s",
"KUMA_METRICS_ZONE_SUBSCRIPTION_LIMIT": "23",
"KUMA_METRICS_ZONE_IDLE_TIMEOUT": "2m",
"KUMA_METRICS_ZONE_COMPACT_FINISHED_SUBSCRIPTIONS": "true",
"KUMA_METRICS_MESH_MIN_RESYNC_TIMEOUT": "27s",
"KUMA_METRICS_MESH_MAX_RESYNC_TIMEOUT": "35s",
"KUMA_METRICS_MESH_MIN_RESYNC_INTERVAL": "27s",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
},
rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10,
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert),
NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions),
l,
rt.Extensions(),
)
Expand Down
24 changes: 18 additions & 6 deletions pkg/kds/server/status_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,24 @@ func (s *zoneInsightSink) Start(ctx context.Context, stop <-chan struct{}) {
}
}

func NewZonesInsightStore(resManager manager.ResourceManager, upsertCfg config_store.UpsertConfig) ZoneInsightStore {
func NewZonesInsightStore(
resManager manager.ResourceManager,
upsertCfg config_store.UpsertConfig,
compactFinished bool,
) ZoneInsightStore {
return &zoneInsightStore{
resManager: resManager,
upsertCfg: upsertCfg,
resManager: resManager,
upsertCfg: upsertCfg,
compactFinished: compactFinished,
}
}

var _ ZoneInsightStore = &zoneInsightStore{}

type zoneInsightStore struct {
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
resManager manager.ResourceManager
upsertCfg config_store.UpsertConfig
compactFinished bool
}

func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription *system_proto.KDSSubscription) error {
Expand All @@ -129,6 +135,12 @@ func (s *zoneInsightStore) Upsert(ctx context.Context, zone string, subscription
}
zoneInsight := system.NewZoneInsightResource()
return manager.Upsert(ctx, s.resManager, key, zoneInsight, func(resource core_model.Resource) error {
return zoneInsight.Spec.UpdateSubscription(subscription)
if err := zoneInsight.Spec.UpdateSubscription(subscription); err != nil {
return err
}
if s.compactFinished {
zoneInsight.Spec.CompactFinished()
}
return nil
})
}
2 changes: 1 addition & 1 deletion pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
return time.NewTicker(rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration)
}, func() *time.Ticker {
return time.NewTicker(rt.Config().Metrics.Zone.IdleTimeout.Duration / 2)
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert), l, rt.Extensions())
}, rt.Config().Multizone.Global.KDS.ZoneInsightFlushInterval.Duration/10, kds_server.NewZonesInsightStore(rt.ResourceManager(), rt.Config().Store.Upsert, rt.Config().Metrics.Zone.CompactFinishedSubscriptions), l, rt.Extensions())
}, log)
}

Expand Down