Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(insights): improve ZoneInsight subscription management #8153

Merged
merged 12 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/generic/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
type Insight interface {
proto.Message
IsOnline() bool
GetLastSubscription() Subscription
GetSubscription(id string) Subscription
AllSubscriptions() []Subscription
UpdateSubscription(Subscription) error
}

type Subscription interface {
proto.Message
GetId() string
GetGeneration() uint32
IsOnline() bool
SetDisconnectTime(time time.Time)
}
33 changes: 23 additions & 10 deletions api/mesh/v1alpha1/dataplane_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,21 @@ func (x *DataplaneInsight) IsOnline() bool {
return false
}

func (x *DataplaneInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
func (x *DataplaneInsight) AllSubscriptions() []generic.Subscription {
var subs []generic.Subscription
for _, s := range x.GetSubscriptions() {
subs = append(subs, s)
}
return subs
}

func (x *DataplaneInsight) GetSubscription(id string) generic.Subscription {
for _, s := range x.GetSubscriptions() {
lobkovilya marked this conversation as resolved.
Show resolved Hide resolved
if s.Id == id {
return i, s
return s
}
}
return -1, nil
return nil
}

func (x *DataplaneInsight) UpdateCert(generation time.Time, expiration time.Time, issuedBackend string, supportedBackends []string) error {
Expand Down Expand Up @@ -93,13 +101,14 @@ func (x *DataplaneInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for DataplaneInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -126,6 +135,10 @@ func (x *DiscoverySubscription) SetDisconnectTime(t time.Time) {
x.DisconnectTime = util_proto.MustTimestampProto(t)
}

func (x *DiscoverySubscription) IsOnline() bool {
return x.GetConnectTime() != nil && x.GetDisconnectTime() == nil
}

func (x *DataplaneInsight) Sum(v func(*DiscoverySubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range x.GetSubscriptions() {
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/dataplane_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ var _ = Describe("DataplaneHelpers", func() {
})).To(Succeed())

// then
_, subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := dataplaneInsight.GetSubscription("2")
Expect(subscription.(*DiscoverySubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
29 changes: 19 additions & 10 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

var _ generic.Insight = &ZoneIngressInsight{}

func (x *ZoneIngressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
func (x *ZoneIngressInsight) GetSubscription(id string) generic.Subscription {
for _, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
return s
}
}
return -1, nil
return nil
}

func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
Expand All @@ -26,13 +26,14 @@ func (x *ZoneIngressInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneIngressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -57,6 +58,14 @@ func (x *ZoneIngressInsight) IsOnline() bool {
return false
}

func (x *ZoneIngressInsight) AllSubscriptions() []generic.Subscription {
var subs []generic.Subscription
for _, s := range x.GetSubscriptions() {
subs = append(subs, s)
}
return subs
}

func (x *ZoneIngressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return (*DiscoverySubscription)(nil)
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/zone_ingress_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Ingress Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*mesh_proto.DiscoverySubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
31 changes: 21 additions & 10 deletions api/mesh/v1alpha1/zoneegressinsight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (

var _ generic.Insight = &ZoneEgressInsight{}

func (x *ZoneEgressInsight) GetSubscription(id string) (int, *DiscoverySubscription) {
for i, s := range x.GetSubscriptions() {
func (x *ZoneEgressInsight) GetSubscription(id string) generic.Subscription {
for _, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
return s
}
}
return -1, nil
return nil
}

func (x *ZoneEgressInsight) UpdateSubscription(s generic.Subscription) error {
Expand All @@ -26,13 +26,14 @@ func (x *ZoneEgressInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneEgressInsight", s)
}
i, old := x.GetSubscription(discoverySubscription.Id)
if old != nil {
x.Subscriptions[i] = discoverySubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == discoverySubscription.Id {
x.Subscriptions[i] = discoverySubscription
return nil
}
}
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, discoverySubscription)
return nil
}

Expand All @@ -57,6 +58,16 @@ func (x *ZoneEgressInsight) IsOnline() bool {
return false
}

func (x *ZoneEgressInsight) AllSubscriptions() []generic.Subscription {
var subs []generic.Subscription
for _, s := range x.GetSubscriptions() {
if s.ConnectTime != nil && s.DisconnectTime == nil {
subs = append(subs, s)
}
}
return subs
}

func (x *ZoneEgressInsight) GetLastSubscription() generic.Subscription {
if len(x.GetSubscriptions()) == 0 {
return (*DiscoverySubscription)(nil)
Expand Down
4 changes: 2 additions & 2 deletions api/mesh/v1alpha1/zoneegressinsight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Egress Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*mesh_proto.DiscoverySubscription)).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
33 changes: 23 additions & 10 deletions api/system/v1alpha1/zone_insight_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func NewSubscriptionStatus() *KDSSubscriptionStatus {
}
}

func (x *ZoneInsight) GetSubscription(id string) (int, *KDSSubscription) {
for i, s := range x.GetSubscriptions() {
func (x *ZoneInsight) GetSubscription(id string) generic.Subscription {
for _, s := range x.GetSubscriptions() {
if s.Id == id {
return i, s
return s
}
}
return -1, nil
return nil
}

func (x *ZoneInsight) GetLastSubscription() generic.Subscription {
Expand All @@ -44,10 +44,22 @@ func (x *ZoneInsight) IsOnline() bool {
return false
}

func (x *ZoneInsight) AllSubscriptions() []generic.Subscription {
var subs []generic.Subscription
for _, s := range x.GetSubscriptions() {
subs = append(subs, s)
}
return subs
}

func (x *KDSSubscription) SetDisconnectTime(time time.Time) {
x.DisconnectTime = timestamppb.New(time)
}

func (x *KDSSubscription) IsOnline() bool {
return x.GetConnectTime() != nil && x.GetDisconnectTime() == nil
}

func (x *ZoneInsight) Sum(v func(*KDSSubscription) uint64) uint64 {
var result uint64 = 0
for _, s := range x.GetSubscriptions() {
Expand All @@ -64,13 +76,14 @@ func (x *ZoneInsight) UpdateSubscription(s generic.Subscription) error {
if !ok {
return errors.Errorf("invalid type %T for ZoneInsight", s)
}
i, old := x.GetSubscription(kdsSubscription.Id)
if old != nil {
x.Subscriptions[i] = kdsSubscription
} else {
x.finalizeSubscriptions()
x.Subscriptions = append(x.Subscriptions, kdsSubscription)
for i, sub := range x.GetSubscriptions() {
if sub.GetId() == kdsSubscription.Id {
x.Subscriptions[i] = kdsSubscription
return nil
}
}
x.finalizeSubscriptions()
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
x.Subscriptions = append(x.Subscriptions, kdsSubscription)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions api/system/v1alpha1/zone_insight_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var _ = Describe("Zone Insights", func() {
})).To(Succeed())

// then
_, subscription := zoneInsight.GetSubscription("2")
Expect(subscription.DisconnectTime).ToNot(BeNil())
subscription := zoneInsight.GetSubscription("2")
Expect(subscription.(*system_proto.KDSSubscription).DisconnectTime).ToNot(BeNil())
})

It("should return error for wrong subscription type", func() {
Expand Down
55 changes: 32 additions & 23 deletions pkg/gc/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@ var finalizerLog = core.Log.WithName("finalizer")
// 3. Kuma CP is up
// 4. DPP status is Online whereas it should be Offline

type descriptor struct {
id string
generation uint32
}

type tenantID string

type insightMap map[core_model.ResourceKey]*descriptor
type subscriptionGenerationMap map[string]uint32

type insightMap map[core_model.ResourceKey]subscriptionGenerationMap

type insightsByType map[core_model.ResourceType]insightMap

Expand Down Expand Up @@ -149,29 +146,41 @@ func (f *subscriptionFinalizer) checkGeneration(ctx context.Context, typ core_mo
continue
}

old, ok := f.insights[tenantID(tenantId)][typ][key]
if !ok || old.id != insight.GetLastSubscription().GetId() || old.generation != insight.GetLastSubscription().GetGeneration() {
// something changed since the last check, either subscriptionId or generation were updated
// don't finalize the subscription, update map with fresh data
f.insights[tenantID(tenantId)][typ][key] = &descriptor{
id: insight.GetLastSubscription().GetId(),
generation: insight.GetLastSubscription().GetGeneration(),
lastGens := f.insights[tenantID(tenantId)][typ][key]
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved

subsToFinalize := map[string]struct{}{}
newWatchedSubs := subscriptionGenerationMap{}

for _, sub := range insight.AllSubscriptions() {
if !sub.IsOnline() {
continue
}
id := sub.GetId()
if gen, ok := lastGens[id]; !ok || gen != sub.GetGeneration() {
newWatchedSubs[id] = sub.GetGeneration()
} else {
subsToFinalize[id] = struct{}{}
}
continue
}

log.V(1).Info("mark subscription as disconnected")
insight.GetLastSubscription().SetDisconnectTime(now)

upsertInsight, _ := registry.Global().NewObject(typ)
err := manager.Upsert(ctx, f.rm, key, upsertInsight, func(r core_model.Resource) error {
return upsertInsight.GetSpec().(generic.Insight).UpdateSubscription(insight.GetLastSubscription())
})
if err != nil {
log.Error(err, "unable to finalize subscription")
if err := manager.Upsert(ctx, f.rm, key, upsertInsight, func(r core_model.Resource) error {
insight := upsertInsight.GetSpec().(generic.Insight)
for id := range subsToFinalize {
log.V(1).Info("mark subscription as disconnected", "id", id)
sub := insight.GetSubscription(id)
sub.SetDisconnectTime(now)
if err := insight.UpdateSubscription(sub); err != nil {
return errors.Wrapf(err, "unable to update subscription %s", id)
}
}
return nil
}); err != nil {
log.Error(err, "unable to upsert insight")
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
return err
}
delete(f.insights[tenantID(tenantId)][typ], key)

f.insights[tenantID(tenantId)][typ][key] = newWatchedSubs
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand Down
Loading