Skip to content

Commit

Permalink
test(finalizer): add tests with multiple online subs
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont committed Oct 26, 2023
1 parent 8f0cd2d commit 6d36600
Showing 1 changed file with 85 additions and 7 deletions.
92 changes: 85 additions & 7 deletions pkg/gc/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,34 @@ var _ = Describe("Subscription Finalizer", func() {
}()
}

alreadyOfflineSub := "stream-id-1"
onlineSub := "stream-id-2"
createZoneInsight := func() {
Expect(rm.Create(context.Background(), &system.ZoneInsightResource{
Spec: &system_proto.ZoneInsight{
Subscriptions: []*system_proto.KDSSubscription{
{
Id: alreadyOfflineSub,
GlobalInstanceId: "cp-1",
ConnectTime: proto.MustTimestampProto(sampleTime),
DisconnectTime: proto.MustTimestampProto(sampleTime.Add(1 * time.Hour)),
Status: system_proto.NewSubscriptionStatus(),
},
{
Id: onlineSub,
GlobalInstanceId: "cp-1",
ConnectTime: proto.MustTimestampProto(sampleTime.Add(1 * time.Hour)),
Status: system_proto.NewSubscriptionStatus(),
Generation: 0,
},
},
},
}, store.CreateByKey("zone-1", core_model.NoMesh))).To(Succeed())
}

onlineSub1 := "stream-id-2"
onlineSub2 := "stream-id-3"
createZoneInsightWithMultipleOnlineSubs := func() {
Expect(rm.Create(context.Background(), &system.ZoneInsightResource{
Spec: &system_proto.ZoneInsight{
Subscriptions: []*system_proto.KDSSubscription{
Expand All @@ -53,7 +80,14 @@ var _ = Describe("Subscription Finalizer", func() {
Status: system_proto.NewSubscriptionStatus(),
},
{
Id: "stream-id-2",
Id: onlineSub1,
GlobalInstanceId: "cp-1",
ConnectTime: proto.MustTimestampProto(sampleTime.Add(1 * time.Hour)),
Status: system_proto.NewSubscriptionStatus(),
Generation: 0,
},
{
Id: onlineSub2,
GlobalInstanceId: "cp-1",
ConnectTime: proto.MustTimestampProto(sampleTime.Add(1 * time.Hour)),
Status: system_proto.NewSubscriptionStatus(),
Expand All @@ -72,21 +106,21 @@ var _ = Describe("Subscription Finalizer", func() {
return zoneInsight.Spec.IsOnline()
}

incGeneration := func() {
incGeneration := func(id string) {
zoneInsight := system.NewZoneInsightResource()
Expect(
rm.Get(context.Background(), zoneInsight, store.GetByKey("zone-1", core_model.NoMesh)),
).To(Succeed())
zoneInsight.Spec.GetLastSubscription().(*system_proto.KDSSubscription).Generation++
zoneInsight.Spec.GetSubscription(id).(*system_proto.KDSSubscription).Generation++
Expect(rm.Update(context.Background(), zoneInsight)).To(Succeed())
}

addNewSubscription := func() {
disconnectAndAddNewSubscription := func() {
zoneInsight := system.NewZoneInsightResource()
Expect(
rm.Get(context.Background(), zoneInsight, store.GetByKey("zone-1", core_model.NoMesh)),
).To(Succeed())
zoneInsight.Spec.GetLastSubscription().SetDisconnectTime(sampleTime.Add(2 * time.Hour))
zoneInsight.Spec.GetSubscription(onlineSub).SetDisconnectTime(sampleTime.Add(2 * time.Hour))
zoneInsight.Spec.Subscriptions = append(zoneInsight.Spec.Subscriptions, &system_proto.KDSSubscription{
Id: "stream-id-3",
GlobalInstanceId: "cp-1",
Expand Down Expand Up @@ -119,7 +153,7 @@ var _ = Describe("Subscription Finalizer", func() {
Eventually(listCalled(1), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())

incGeneration()
incGeneration(onlineSub)
// finalizer should memorize the new generation = 1
ticks <- time.Time{}
Eventually(listCalled(2), "5s", "100ms").Should(BeTrue())
Expand All @@ -145,13 +179,57 @@ var _ = Describe("Subscription Finalizer", func() {
Eventually(listCalled(1), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())

addNewSubscription()
disconnectAndAddNewSubscription()

// generation is the same, but last subscriptionId was changed
ticks <- time.Time{}
Eventually(listCalled(2), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())
})

It("should finalize multiple subscriptions if generations haven't changed after timeout", func() {
stop := make(chan struct{})
defer close(stop)
ticks := make(chan time.Time)
defer close(ticks)
startSubscriptionFinalizer(ticks, stop)

createZoneInsightWithMultipleOnlineSubs()

// finalizer should memorize the current generation = 0
ticks <- time.Time{}
Eventually(listCalled(1), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())

// finalizer should observe the generation didn't change and set DisconnectTime
ticks <- time.Time{}
Eventually(listCalled(2), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeFalse())
})

It("should finalize only one of multiple online subscriptions if only its generation changes", func() {
stop := make(chan struct{})
defer close(stop)
ticks := make(chan time.Time)
defer close(ticks)
startSubscriptionFinalizer(ticks, stop)

createZoneInsightWithMultipleOnlineSubs()

ticks <- time.Time{}
Eventually(listCalled(1), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())

incGeneration(onlineSub2)

ticks <- time.Time{}
Eventually(listCalled(2), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeTrue())

ticks <- time.Time{}
Eventually(listCalled(3), "5s", "100ms").Should(BeTrue())
Eventually(isOnline, "5s", "100ms").Should(BeFalse())
})
})

type countingManager struct {
Expand Down

0 comments on commit 6d36600

Please sign in to comment.