From 1dbb3a80239044c32b728fcf410f62b91aa12eff Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Thu, 5 Oct 2023 13:33:47 +0200 Subject: [PATCH 01/23] feat(kuma-cp): call zone HealthCheck periodically from zone Signed-off-by: Mike Beaumont --- pkg/kds/mux/client.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 231cc0e52461..9778417daee9 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "net/url" "os" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -104,6 +105,8 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { log := muxClientLog.WithValues("client-id", c.clientID) errorCh := make(chan error) + c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh) + go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh) go c.startStats(withKDSCtx, log, conn, stop, errorCh) go c.startClusters(withKDSCtx, log, conn, stop, errorCh) @@ -282,6 +285,42 @@ func (c *client) startClusters( c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) } +func (c *client) startHealthCheck( + ctx context.Context, + log logr.Logger, + conn *grpc.ClientConn, + stop <-chan struct{}, + errorCh chan error, +) { + client := mesh_proto.NewGlobalKDSServiceClient(conn) + log = log.WithValues("rpc", "healthcheck") + log.Info("starting") + + ticker := time.NewTicker(5 * time.Second) + + go func() { + defer ticker.Stop() + + _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + if err != nil && !errors.Is(err, context.Canceled) { + errorCh <- err + } + + for { + select { + case <-ticker.C: + _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + if err != nil && !errors.Is(err, context.Canceled) { + errorCh <- err + } + case <-stop: + log.Info("stopping") + return + } + } + }() +} + func (c *client) handleProcessingErrors( stream grpc.ClientStream, log logr.Logger, From 9230b77c8dd6dd2afc2ec5754b3480b6994a474d Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Thu, 28 Sep 2023 17:06:27 +0200 Subject: [PATCH 02/23] feat(kuma-cp): kill zone connections when health check fails Signed-off-by: Mike Beaumont --- pkg/kds/global/components.go | 12 +++++ pkg/kds/mux/client.go | 19 +++---- pkg/kds/mux/zone_sync.go | 38 ++++++++++++-- pkg/kds/mux/zone_watch.go | 97 ++++++++++++++++++++++++++++++++++++ pkg/kds/service/server.go | 84 ++++++++++++++++++++++++------- 5 files changed, 216 insertions(+), 34 deletions(-) create mode 100644 pkg/kds/mux/zone_watch.go diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index c82a25cfcd1c..45b8215d3fff 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -154,6 +155,15 @@ func Setup(rt runtime.Runtime) error { for _, filter := range rt.KDSContext().GlobalServerFiltersV2 { streamInterceptors = append(streamInterceptors, filter) } + zwLog := kdsGlobalLog.WithName("zone-watch") + if err := rt.Add(component.NewResilientComponent(zwLog, mux.NewZoneWatch( + zwLog, + rt.EventBus(), + rt.ReadOnlyResourceManager(), + 5*time.Second, + ))); err != nil { + return err + } return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer( onSessionStarted, rt.KDSContext().GlobalServerFilters, @@ -167,12 +177,14 @@ func Setup(rt runtime.Runtime) error { streamInterceptors, rt.Extensions(), rt.Config().Store.Upsert, + rt.EventBus(), ), mux.NewKDSSyncServiceServer( onGlobalToZoneSyncConnect, onZoneToGlobalSyncConnect, rt.KDSContext().GlobalServerFiltersV2, rt.Extensions(), + rt.EventBus(), ), ))) } diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 9778417daee9..d2f9fa1a4be3 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -296,23 +296,18 @@ func (c *client) startHealthCheck( log = log.WithValues("rpc", "healthcheck") log.Info("starting") - ticker := time.NewTicker(5 * time.Second) - go func() { + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - - _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) - if err != nil && !errors.Is(err, context.Canceled) { - errorCh <- err - } - for { + _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + if err != nil && !errors.Is(err, context.Canceled) { + errorCh <- err + } + select { case <-ticker.C: - _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) - if err != nil && !errors.Is(err, context.Canceled) { - errorCh <- err - } + continue case <-stop: log.Info("stopping") return diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index 1fb8cf1f4e9e..d869722dad5e 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -10,8 +10,11 @@ import ( mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/kds/util" "github.com/kumahq/kuma/pkg/log" + "github.com/kumahq/kuma/pkg/multitenant" ) type FilterV2 interface { @@ -38,6 +41,7 @@ type KDSSyncServiceServer struct { zoneToGlobalCb OnZoneToGlobalSyncConnectFunc filters []FilterV2 extensions context.Context + eventBus events.EventBus mesh_proto.UnimplementedKDSSyncServiceServer } @@ -46,12 +50,14 @@ func NewKDSSyncServiceServer( zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2, extensions context.Context, + eventBus events.EventBus, ) *KDSSyncServiceServer { return &KDSSyncServiceServer{ globalToZoneCb: globalToZoneCb, zoneToGlobalCb: zoneToGlobalCb, filters: filters, extensions: extensions, + eventBus: eventBus, } } @@ -59,19 +65,26 @@ var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{} func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) + zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", clientID) + logger = logger.WithValues("clientID", zone) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } + + tenantID, _ := multitenant.TenantFromCtx(stream.Context()) + shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone) + defer shouldDisconnectStream.Close() + processingErrorsCh := make(chan error) go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh) select { + case <-shouldDisconnectStream.Recv(): + return nil case <-stream.Context().Done(): logger.Info("GlobalToZoneSync rpc stream stopped") return nil @@ -86,19 +99,26 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) + zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", clientID) + logger = logger.WithValues("clientID", zone) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } + + tenantID, _ := multitenant.TenantFromCtx(stream.Context()) + shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone) + defer shouldDisconnectStream.Close() + processingErrorsCh := make(chan error) go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh) select { + case <-shouldDisconnectStream.Recv(): + return nil case <-stream.Context().Done(): logger.Info("ZoneToGlobalSync rpc stream stopped") return nil @@ -110,3 +130,13 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService return status.Error(codes.Internal, "stream failed") } } + +func (g *KDSSyncServiceServer) watchZoneHealthCheck(tenantID, zone string) events.Listener { + shouldDisconnectStream := g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(service.ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + + return shouldDisconnectStream +} diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go new file mode 100644 index 000000000000..9d8d98a20ea1 --- /dev/null +++ b/pkg/kds/mux/zone_watch.go @@ -0,0 +1,97 @@ +package mux + +import ( + "context" + "time" + + "github.com/go-logr/logr" + + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds/service" + "github.com/kumahq/kuma/pkg/multitenant" +) + +type zoneTenant struct { + zone string + tenantID string +} + +type ZoneWatch struct { + log logr.Logger + bus events.EventBus + rm manager.ReadOnlyResourceManager + poll time.Duration + zones map[zoneTenant]time.Time +} + +func NewZoneWatch( + log logr.Logger, + bus events.EventBus, + rm manager.ReadOnlyResourceManager, + poll time.Duration, +) *ZoneWatch { + return &ZoneWatch{ + log: log, + bus: bus, + rm: rm, + poll: poll, + zones: map[zoneTenant]time.Time{}, + } +} + +func (zw *ZoneWatch) Start(stop <-chan struct{}) error { + timer := time.NewTicker(zw.poll) + defer timer.Stop() + + connectionWatch := zw.bus.Subscribe(func(e events.Event) bool { + _, ok := e.(service.ZoneOpenedStream) + return ok + }) + defer connectionWatch.Close() + + for { + select { + case <-timer.C: + for zone, firstSeen := range zw.zones { + ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) + zoneInsight := system.NewZoneInsightResource() + if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(zone.zone, model.NoMesh)); err != nil { + zw.log.Info("error getting ZoneInsight", "zone", zone.zone) + } + lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() + if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > 20*time.Second { + zw.bus.Send(service.ZoneWentOffline{ + Zone: zone.zone, + TenantID: zone.tenantID, + }) + delete(zw.zones, zone) + } + } + case e := <-connectionWatch.Recv(): + newStream := e.(service.ZoneOpenedStream) + + ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID) + zoneInsight := system.NewZoneInsightResource() + + if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(newStream.Zone, model.NoMesh)); err != nil { + zw.log.Info("error getting ZoneInsight", "zone", newStream.Zone) + } + + zw.zones[zoneTenant{ + tenantID: newStream.TenantID, + zone: newStream.Zone, + }] = zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() + case <-stop: + return nil + } + } +} + +func (zw *ZoneWatch) NeedLeaderElection() bool { + return false +} diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 58aa15e3fa0d..3f1f1f241dd7 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -22,6 +22,7 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" core_store "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" "github.com/kumahq/kuma/pkg/kds/util" kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" @@ -34,6 +35,12 @@ type StreamInterceptor interface { InterceptServerStream(stream grpc.ServerStream) error } +type ActiveStreams struct { + XDSConfig chan struct{} + Stats chan struct{} + Clusters chan struct{} +} + type GlobalKDSServiceServer struct { envoyAdminRPCs EnvoyAdminRPCs resManager manager.ResourceManager @@ -41,6 +48,7 @@ type GlobalKDSServiceServer struct { filters []StreamInterceptor extensions context.Context upsertCfg config_store.UpsertConfig + eventBus events.EventBus mesh_proto.UnimplementedGlobalKDSServiceServer } @@ -51,6 +59,7 @@ func NewGlobalKDSServiceServer( filters []StreamInterceptor, extensions context.Context, upsertCfg config_store.UpsertConfig, + eventBus events.EventBus, ) *GlobalKDSServiceServer { return &GlobalKDSServiceServer{ envoyAdminRPCs: envoyAdminRPCs, @@ -59,6 +68,7 @@ func NewGlobalKDSServiceServer( filters: filters, extensions: extensions, upsertCfg: upsertCfg, + eventBus: eventBus, } } @@ -106,6 +116,15 @@ func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto. return &mesh_proto.ZoneHealthCheckResponse{}, nil } +type ZoneWentOffline struct { + TenantID string + Zone string +} +type ZoneOpenedStream struct { + TenantID string + Zone string +} + func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( rpcName string, rpc util_grpc.ReverseUnaryRPCs, @@ -117,6 +136,15 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.InvalidArgument, err.Error()) } clientID := ClientID(stream.Context(), zone) + tenantID, _ := multitenant.TenantFromCtx(stream.Context()) + + shouldDisconnectStream := g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + defer shouldDisconnectStream.Close() + g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + logger := log.WithValues("rpc", rpcName, "clientID", clientID) logger = kuma_log.AddFieldsFromCtx(logger, stream.Context(), g.extensions) for _, filter := range g.filters { @@ -136,26 +164,46 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.Internal, "could not store stream connection") } logger.Info("stored stream connection") - for { - resp, err := recv() - if err == io.EOF { - logger.Info("stream stopped") - return nil + streamResult := make(chan error, 2) + streamReadEnded := make(chan struct{}) + go func() { + select { + case <-shouldDisconnectStream.Recv(): + streamResult <- nil + case <-streamReadEnded: + return } - if status.Code(err) == codes.Canceled { - logger.Info("stream cancelled") - return nil - } - if err != nil { - logger.Error(err, "could not receive a message") - return status.Error(codes.Internal, "could not receive a message") - } - logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) - if err := rpc.ResponseReceived(clientID, resp); err != nil { - logger.Error(err, "could not mark the response as received") - return status.Error(codes.InvalidArgument, "could not mark the response as received") + }() + go func() { + defer func() { + close(streamReadEnded) + }() + for { + resp, err := recv() + if err == io.EOF { + logger.Info("stream stopped") + streamResult <- nil + return + } + if status.Code(err) == codes.Canceled { + logger.Info("stream cancelled") + streamResult <- nil + return + } + if err != nil { + logger.Error(err, "could not receive a message") + streamResult <- status.Error(codes.Internal, "could not receive a message") + return + } + logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) + if err := rpc.ResponseReceived(clientID, resp); err != nil { + logger.Error(err, "could not mark the response as received") + streamResult <- status.Error(codes.InvalidArgument, "could not mark the response as received") + return + } } - } + }() + return <-streamResult } func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone string, rpcName string, instance string) error { From 3c67e80f46cec0665428114b39028a9e290c30a5 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Thu, 5 Oct 2023 16:22:56 +0200 Subject: [PATCH 03/23] feat(kuma-cp): only watch zone health if feature present Signed-off-by: Mike Beaumont --- pkg/events/interfaces.go | 13 +++++++++++++ pkg/kds/features.go | 6 ++++++ pkg/kds/mux/client.go | 2 ++ pkg/kds/service/server.go | 21 ++++++++++++++++----- 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/pkg/events/interfaces.go b/pkg/events/interfaces.go index fd21603c8056..d4666e91b913 100644 --- a/pkg/events/interfaces.go +++ b/pkg/events/interfaces.go @@ -34,6 +34,19 @@ type Listener interface { Close() } +func NewNeverListener() Listener { + return &neverRecvListener{} +} + +type neverRecvListener struct{} + +func (*neverRecvListener) Recv() <-chan Event { + return nil +} + +func (*neverRecvListener) Close() { +} + type Predicate = func(event Event) bool type Emitter interface { diff --git a/pkg/kds/features.go b/pkg/kds/features.go index 0a7b0e93fd77..4a5d3b53c890 100644 --- a/pkg/kds/features.go +++ b/pkg/kds/features.go @@ -9,5 +9,11 @@ func (f Features) HasFeature(feature string) bool { return f[feature] } +const FeaturesMetadataKey string = "features" + // FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane. const FeatureZoneToken string = "zone-token" + +// FeatureZonePingHealth means that the zone control plane sends pings to the +// global control plane to indicate it's still running. +const FeatureZonePingHealth string = "zone-ping-health" diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index d2f9fa1a4be3..cf8495210786 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -24,6 +24,7 @@ import ( "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/version" @@ -99,6 +100,7 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID, KDSVersionHeaderKey, KDSVersionV3, + kds.FeaturesMetadataKey, kds.FeatureZonePingHealth, )) defer cancel() diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 3f1f1f241dd7..c4536e629551 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -5,11 +5,13 @@ import ( "fmt" "io" "math/rand" + "slices" "time" "github.com/sethvargo/go-retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" @@ -23,6 +25,7 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/model" core_store "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/util" kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" @@ -138,12 +141,20 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( clientID := ClientID(stream.Context(), zone) tenantID, _ := multitenant.TenantFromCtx(stream.Context()) - shouldDisconnectStream := g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone - }) + shouldDisconnectStream := events.NewNeverListener() + + md, _ := metadata.FromIncomingContext(stream.Context()) + features := md.Get(kds.FeaturesMetadataKey) + + if slices.Contains(features, kds.FeatureZonePingHealth) { + shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + } + defer shouldDisconnectStream.Close() - g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantID}) logger := log.WithValues("rpc", rpcName, "clientID", clientID) logger = kuma_log.AddFieldsFromCtx(logger, stream.Context(), g.extensions) From 09d2ec0ae0f2e5dbd910330284c2ba7eaf6119fa Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Fri, 6 Oct 2023 14:27:56 +0200 Subject: [PATCH 04/23] feat(kuma-cp): add config for global zone health check logic Signed-off-by: Mike Beaumont --- pkg/config/multizone/kds.go | 11 +++++++++++ pkg/kds/global/components.go | 4 ++-- pkg/kds/mux/zone_watch.go | 28 +++++++++++++++------------- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index ad8f2d769c7a..e60fe6c80d36 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -37,6 +37,17 @@ type KdsServerConfig struct { NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_global_kds_nack_backoff"` // DisableSOTW if true doesn't expose SOTW version of KDS. Default: false DisableSOTW bool `json:"disableSOTW" envconfig:"kuma_multizone_global_kds_disable_sotw"` + // ZoneHealthCheck holds config for ensuring zones are online + ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"` +} + +type ZoneHealthCheckConfig struct { + // PollInterval is the interval between the CP checking ZoneInsight for + // health check pings + PollInterval config_types.Duration `json:"pollInterval"` + // Timeout is the time after the last health check that a zone counts as + // no longer online + Timeout config_types.Duration `json:"timeout"` } var _ config.Config = &KdsServerConfig{} diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 45b8215d3fff..551f332a52ba 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -158,9 +157,9 @@ func Setup(rt runtime.Runtime) error { zwLog := kdsGlobalLog.WithName("zone-watch") if err := rt.Add(component.NewResilientComponent(zwLog, mux.NewZoneWatch( zwLog, + *rt.Config().Multizone.Global.KDS, rt.EventBus(), rt.ReadOnlyResourceManager(), - 5*time.Second, ))); err != nil { return err } @@ -178,6 +177,7 @@ func Setup(rt runtime.Runtime) error { rt.Extensions(), rt.Config().Store.Upsert, rt.EventBus(), + rt.Config().Multizone.Global.KDS.RefreshInterval.Duration, ), mux.NewKDSSyncServiceServer( onGlobalToZoneSyncConnect, diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index 9d8d98a20ea1..826ab4e72074 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -6,7 +6,7 @@ import ( "github.com/go-logr/logr" - "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/config/multizone" "github.com/kumahq/kuma/pkg/core/resources/apis/system" "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" @@ -22,25 +22,27 @@ type zoneTenant struct { } type ZoneWatch struct { - log logr.Logger - bus events.EventBus - rm manager.ReadOnlyResourceManager - poll time.Duration - zones map[zoneTenant]time.Time + log logr.Logger + poll time.Duration + timeout time.Duration + bus events.EventBus + rm manager.ReadOnlyResourceManager + zones map[zoneTenant]time.Time } func NewZoneWatch( log logr.Logger, + cfg multizone.KdsServerConfig, bus events.EventBus, rm manager.ReadOnlyResourceManager, - poll time.Duration, ) *ZoneWatch { return &ZoneWatch{ - log: log, - bus: bus, - rm: rm, - poll: poll, - zones: map[zoneTenant]time.Time{}, + log: log, + poll: cfg.ZoneHealthCheck.PollInterval.Duration, + timeout: cfg.ZoneHealthCheck.Timeout.Duration, + bus: bus, + rm: rm, + zones: map[zoneTenant]time.Time{}, } } @@ -64,7 +66,7 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { zw.log.Info("error getting ZoneInsight", "zone", zone.zone) } lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() - if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > 20*time.Second { + if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > zw.timeout { zw.bus.Send(service.ZoneWentOffline{ Zone: zone.zone, TenantID: zone.tenantID, From 81bc0839fa8356d24e3196fb8cf37a6eea69b721 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Fri, 6 Oct 2023 14:21:08 +0200 Subject: [PATCH 05/23] feat(kuma-cp): use health check interval from global Signed-off-by: Mike Beaumont --- api/mesh/v1alpha1/kds.pb.go | 282 ++++++++++-------- api/mesh/v1alpha1/kds.proto | 7 +- .../raw/protos/ZoneHealthCheckResponse.json | 8 + pkg/kds/global/components.go | 2 +- pkg/kds/mux/client.go | 6 +- pkg/kds/service/server.go | 36 ++- 6 files changed, 190 insertions(+), 151 deletions(-) diff --git a/api/mesh/v1alpha1/kds.pb.go b/api/mesh/v1alpha1/kds.pb.go index 3d6ec34f29d5..2f9842b24340 100644 --- a/api/mesh/v1alpha1/kds.pb.go +++ b/api/mesh/v1alpha1/kds.pb.go @@ -11,6 +11,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" + durationpb "google.golang.org/protobuf/types/known/durationpb" reflect "reflect" sync "sync" ) @@ -119,6 +120,10 @@ type ZoneHealthCheckResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // The the interval that the global control plane + // expects between health check pings + Interval *durationpb.Duration `protobuf:"bytes,1,opt,name=interval,proto3" json:"interval,omitempty"` } func (x *ZoneHealthCheckResponse) Reset() { @@ -153,6 +158,13 @@ func (*ZoneHealthCheckResponse) Descriptor() ([]byte, []int) { return file_api_mesh_v1alpha1_kds_proto_rawDescGZIP(), []int{2} } +func (x *ZoneHealthCheckResponse) GetInterval() *durationpb.Duration { + if x != nil { + return x.Interval + } + return nil +} + // XDSConfigRequest is a request for XDS Config Dump that is executed on Zone // CP. type XDSConfigRequest struct { @@ -747,7 +759,9 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x33, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, - 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, + 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x39, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x75, 0x6d, @@ -761,114 +775,118 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x18, 0x0a, 0x16, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x22, 0x19, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa0, 0x01, - 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, - 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, - 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, - 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, - 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, - 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, - 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, - 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, - 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, - 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, - 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, - 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, - 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, - 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, - 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, - 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, - 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, - 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, - 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, - 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, - 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x73, 0x74, 0x22, 0x50, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x35, 0x0a, + 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x76, 0x61, 0x6c, 0x22, 0xa0, 0x01, 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, + 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, + 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, + 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, + 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, + 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, + 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, + 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, + 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, + 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, + 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, + 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, + 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, + 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, - 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, - 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, - 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, - 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, - 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, - 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, - 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, - 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, - 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, - 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, - 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, - 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, - 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, - 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, - 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, - 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, - 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, - 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, - 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, + 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, + 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, + 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, + 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, + 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, + 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, + 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, + 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, + 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, + 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, + 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, + 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, + 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -896,33 +914,35 @@ var file_api_mesh_v1alpha1_kds_proto_goTypes = []interface{}{ (*ClustersResponse)(nil), // 8: kuma.mesh.v1alpha1.ClustersResponse (*KumaResource_Meta)(nil), // 9: kuma.mesh.v1alpha1.KumaResource.Meta (*anypb.Any)(nil), // 10: google.protobuf.Any - (*v3.DiscoveryRequest)(nil), // 11: envoy.service.discovery.v3.DiscoveryRequest - (*v3.DeltaDiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DeltaDiscoveryRequest - (*v3.DeltaDiscoveryResponse)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryResponse - (*v3.DiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DiscoveryResponse + (*durationpb.Duration)(nil), // 11: google.protobuf.Duration + (*v3.DiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DiscoveryRequest + (*v3.DeltaDiscoveryRequest)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryRequest + (*v3.DeltaDiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DeltaDiscoveryResponse + (*v3.DiscoveryResponse)(nil), // 15: envoy.service.discovery.v3.DiscoveryResponse } var file_api_mesh_v1alpha1_kds_proto_depIdxs = []int32{ 9, // 0: kuma.mesh.v1alpha1.KumaResource.meta:type_name -> kuma.mesh.v1alpha1.KumaResource.Meta 10, // 1: kuma.mesh.v1alpha1.KumaResource.spec:type_name -> google.protobuf.Any - 11, // 2: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest - 4, // 3: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse - 6, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse - 8, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse - 1, // 6: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest - 12, // 7: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 14, // 9: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse - 3, // 10: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest - 5, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest - 7, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest - 2, // 13: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse - 13, // 14: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 12, // 15: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 9, // [9:16] is the sub-list for method output_type - 2, // [2:9] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 11, // 2: kuma.mesh.v1alpha1.ZoneHealthCheckResponse.interval:type_name -> google.protobuf.Duration + 12, // 3: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest + 4, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse + 6, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse + 8, // 6: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse + 1, // 7: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest + 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 14, // 9: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 15, // 10: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse + 3, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest + 5, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest + 7, // 13: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest + 2, // 14: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse + 14, // 15: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 13, // 16: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 10, // [10:17] is the sub-list for method output_type + 3, // [3:10] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_api_mesh_v1alpha1_kds_proto_init() } diff --git a/api/mesh/v1alpha1/kds.proto b/api/mesh/v1alpha1/kds.proto index afa93ade3f60..0aceeacbe167 100644 --- a/api/mesh/v1alpha1/kds.proto +++ b/api/mesh/v1alpha1/kds.proto @@ -6,6 +6,7 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1"; import "envoy/service/discovery/v3/discovery.proto"; import "google/protobuf/any.proto"; +import "google/protobuf/duration.proto"; service KumaDiscoveryService { rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest) @@ -22,10 +23,12 @@ message KumaResource { google.protobuf.Any spec = 2; } -message ZoneHealthCheckRequest { -} +message ZoneHealthCheckRequest {} message ZoneHealthCheckResponse { + // The the interval that the global control plane + // expects between health check pings + google.protobuf.Duration interval = 1; } service GlobalKDSService { diff --git a/docs/generated/raw/protos/ZoneHealthCheckResponse.json b/docs/generated/raw/protos/ZoneHealthCheckResponse.json index 20ba797786ab..d523b0a8c0fc 100644 --- a/docs/generated/raw/protos/ZoneHealthCheckResponse.json +++ b/docs/generated/raw/protos/ZoneHealthCheckResponse.json @@ -3,6 +3,14 @@ "$ref": "#/definitions/ZoneHealthCheckResponse", "definitions": { "ZoneHealthCheckResponse": { + "properties": { + "interval": { + "pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$", + "type": "string", + "description": "The the interval that the global control plane expects between health check pings", + "format": "regex" + } + }, "additionalProperties": true, "type": "object", "title": "Zone Health Check Response" diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 551f332a52ba..7cce39df61f3 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -177,7 +177,7 @@ func Setup(rt runtime.Runtime) error { rt.Extensions(), rt.Config().Store.Upsert, rt.EventBus(), - rt.Config().Multizone.Global.KDS.RefreshInterval.Duration, + rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration, ), mux.NewKDSSyncServiceServer( onGlobalToZoneSyncConnect, diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index cf8495210786..f7c8c6456cf9 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -299,12 +299,14 @@ func (c *client) startHealthCheck( log.Info("starting") go func() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for { - _, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) if err != nil && !errors.Is(err, context.Canceled) { errorCh <- err + } else if resp.Interval.AsDuration() > 0 { + ticker.Reset(resp.Interval.AsDuration()) } select { diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index c4536e629551..f96ed0c4def0 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -45,13 +46,14 @@ type ActiveStreams struct { } type GlobalKDSServiceServer struct { - envoyAdminRPCs EnvoyAdminRPCs - resManager manager.ResourceManager - instanceID string - filters []StreamInterceptor - extensions context.Context - upsertCfg config_store.UpsertConfig - eventBus events.EventBus + envoyAdminRPCs EnvoyAdminRPCs + resManager manager.ResourceManager + instanceID string + filters []StreamInterceptor + extensions context.Context + upsertCfg config_store.UpsertConfig + eventBus events.EventBus + zoneHealthCheckInterval time.Duration mesh_proto.UnimplementedGlobalKDSServiceServer } @@ -63,15 +65,17 @@ func NewGlobalKDSServiceServer( extensions context.Context, upsertCfg config_store.UpsertConfig, eventBus events.EventBus, + zoneHealthCheckInterval time.Duration, ) *GlobalKDSServiceServer { return &GlobalKDSServiceServer{ - envoyAdminRPCs: envoyAdminRPCs, - resManager: resManager, - instanceID: instanceID, - filters: filters, - extensions: extensions, - upsertCfg: upsertCfg, - eventBus: eventBus, + envoyAdminRPCs: envoyAdminRPCs, + resManager: resManager, + instanceID: instanceID, + filters: filters, + extensions: extensions, + upsertCfg: upsertCfg, + eventBus: eventBus, + zoneHealthCheckInterval: zoneHealthCheckInterval, } } @@ -116,7 +120,9 @@ func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto. log.Error(err, "couldn't update zone insight", "zone", zone) } - return &mesh_proto.ZoneHealthCheckResponse{}, nil + return &mesh_proto.ZoneHealthCheckResponse{ + Interval: durationpb.New(g.zoneHealthCheckInterval), + }, nil } type ZoneWentOffline struct { From 0e154d4418e76ff3f11aba909f142bb00e3a94ce Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 09:27:28 +0200 Subject: [PATCH 06/23] refactor: remove unnecessary goroutine Signed-off-by: Mike Beaumont --- pkg/kds/service/server.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index f96ed0c4def0..fcc6fff7f394 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -182,19 +182,7 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( } logger.Info("stored stream connection") streamResult := make(chan error, 2) - streamReadEnded := make(chan struct{}) go func() { - select { - case <-shouldDisconnectStream.Recv(): - streamResult <- nil - case <-streamReadEnded: - return - } - }() - go func() { - defer func() { - close(streamReadEnded) - }() for { resp, err := recv() if err == io.EOF { @@ -220,7 +208,12 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( } } }() - return <-streamResult + select { + case <-shouldDisconnectStream.Recv(): + return nil + case res := <-streamResult: + return res + } } func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone string, rpcName string, instance string) error { From b3a52d0171f13bce0e3a3a4f299a75e4ebf9e4bb Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 09:42:52 +0200 Subject: [PATCH 07/23] fix: properly validate zone health check config Signed-off-by: Mike Beaumont --- pkg/config/loader_test.go | 7 +++++++ pkg/config/multizone/kds.go | 31 ++++++++++++++++++++++--------- pkg/kds/global/components.go | 20 ++++++++++++-------- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 959da25e8ab6..02a06b1d7de2 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -252,6 +252,8 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.MsgSendTimeout.Duration).To(Equal(10 * time.Second)) Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue()) + Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second)) + Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second)) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -565,6 +567,9 @@ multizone: msgSendTimeout: 10s nackBackoff: 11s disableSOTW: true + zoneHealthCheck: + pollInterval: 11s + timeout: 110s zone: globalAddress: "grpc://1.1.1.1:5685" name: "zone-1" @@ -862,6 +867,8 @@ tracing: "KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT": "10s", "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", "KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true", + "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s", + "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", "KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index e60fe6c80d36..1ef6b286e847 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -41,15 +41,6 @@ type KdsServerConfig struct { ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"` } -type ZoneHealthCheckConfig struct { - // PollInterval is the interval between the CP checking ZoneInsight for - // health check pings - PollInterval config_types.Duration `json:"pollInterval"` - // Timeout is the time after the last health check that a zone counts as - // no longer online - Timeout config_types.Duration `json:"timeout"` -} - var _ config.Config = &KdsServerConfig{} func (c *KdsServerConfig) Sanitize() { @@ -81,6 +72,9 @@ func (c *KdsServerConfig) Validate() error { if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil { errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error())) } + if err := c.ZoneHealthCheck.Validate(); err != nil { + errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config")) + } return errs } @@ -109,3 +103,22 @@ func (k KdsClientConfig) Sanitize() { func (k KdsClientConfig) Validate() error { return nil } + +type ZoneHealthCheckConfig struct { + // PollInterval is the interval between the CP checking ZoneInsight for + // health check pings + PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"` + // Timeout is the time after the last health check that a zone counts as + // no longer online + Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"` +} + +func (c ZoneHealthCheckConfig) Sanitize() { +} + +func (c ZoneHealthCheckConfig) Validate() error { + if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) { + return errors.New("timeout and pollInterval must both be either set or unset") + } + return nil +} diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 7cce39df61f3..1b20fb5d9c07 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -154,14 +155,17 @@ func Setup(rt runtime.Runtime) error { for _, filter := range rt.KDSContext().GlobalServerFiltersV2 { streamInterceptors = append(streamInterceptors, filter) } - zwLog := kdsGlobalLog.WithName("zone-watch") - if err := rt.Add(component.NewResilientComponent(zwLog, mux.NewZoneWatch( - zwLog, - *rt.Config().Multizone.Global.KDS, - rt.EventBus(), - rt.ReadOnlyResourceManager(), - ))); err != nil { - return err + + if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) { + zwLog := kdsGlobalLog.WithName("zone-watch") + if err := rt.Add(component.NewResilientComponent(zwLog, mux.NewZoneWatch( + zwLog, + *rt.Config().Multizone.Global.KDS, + rt.EventBus(), + rt.ReadOnlyResourceManager(), + ))); err != nil { + return err + } } return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer( onSessionStarted, From be2a53b762e22a3c7223455705667efea092067f Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 12:52:19 +0200 Subject: [PATCH 08/23] refactor: add logging on health check failed disconnect Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_sync.go | 2 ++ pkg/kds/service/server.go | 1 + 2 files changed, 3 insertions(+) diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index d869722dad5e..f61f9520fa8a 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -84,6 +84,7 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh) select { case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") return nil case <-stream.Context().Done(): logger.Info("GlobalToZoneSync rpc stream stopped") @@ -118,6 +119,7 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh) select { case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") return nil case <-stream.Context().Done(): logger.Info("ZoneToGlobalSync rpc stream stopped") diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index fcc6fff7f394..e782ad9bceb3 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -210,6 +210,7 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( }() select { case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") return nil case res := <-streamResult: return res From 3b48b852b7b410d089b89a153cb5161e159a733b Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 12:58:36 +0200 Subject: [PATCH 09/23] refactor: add logging with tenant id on error and add continue Signed-off-by: Mike Beaumont --- pkg/kds/global/components.go | 1 + pkg/kds/mux/client.go | 2 +- pkg/kds/mux/zone_watch.go | 38 +++++++++++++++++++++++------------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 1b20fb5d9c07..6f76455bba43 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -163,6 +163,7 @@ func Setup(rt runtime.Runtime) error { *rt.Config().Multizone.Global.KDS, rt.EventBus(), rt.ReadOnlyResourceManager(), + rt.Extensions(), ))); err != nil { return err } diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index f7c8c6456cf9..7af1d49b1502 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -304,7 +304,7 @@ func (c *client) startHealthCheck( for { resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) if err != nil && !errors.Is(err, context.Canceled) { - errorCh <- err + errorCh <- errors.Wrap(err, "zone health check request failed") } else if resp.Interval.AsDuration() > 0 { ticker.Reset(resp.Interval.AsDuration()) } diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index 826ab4e72074..6168e50a22db 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -13,6 +13,7 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/events" "github.com/kumahq/kuma/pkg/kds/service" + kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" ) @@ -22,12 +23,13 @@ type zoneTenant struct { } type ZoneWatch struct { - log logr.Logger - poll time.Duration - timeout time.Duration - bus events.EventBus - rm manager.ReadOnlyResourceManager - zones map[zoneTenant]time.Time + log logr.Logger + poll time.Duration + timeout time.Duration + bus events.EventBus + extensions context.Context + rm manager.ReadOnlyResourceManager + zones map[zoneTenant]time.Time } func NewZoneWatch( @@ -35,14 +37,16 @@ func NewZoneWatch( cfg multizone.KdsServerConfig, bus events.EventBus, rm manager.ReadOnlyResourceManager, + extensions context.Context, ) *ZoneWatch { return &ZoneWatch{ - log: log, - poll: cfg.ZoneHealthCheck.PollInterval.Duration, - timeout: cfg.ZoneHealthCheck.Timeout.Duration, - bus: bus, - rm: rm, - zones: map[zoneTenant]time.Time{}, + log: log, + poll: cfg.ZoneHealthCheck.PollInterval.Duration, + timeout: cfg.ZoneHealthCheck.Timeout.Duration, + bus: bus, + extensions: extensions, + rm: rm, + zones: map[zoneTenant]time.Time{}, } } @@ -62,9 +66,13 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { for zone, firstSeen := range zw.zones { ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) zoneInsight := system.NewZoneInsightResource() + + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(zone.zone, model.NoMesh)); err != nil { - zw.log.Info("error getting ZoneInsight", "zone", zone.zone) + log.Info("error getting ZoneInsight", "zone", zone.zone, "error", err) + continue } + lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > zw.timeout { zw.bus.Send(service.ZoneWentOffline{ @@ -80,8 +88,10 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID) zoneInsight := system.NewZoneInsightResource() + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(newStream.Zone, model.NoMesh)); err != nil { - zw.log.Info("error getting ZoneInsight", "zone", newStream.Zone) + log.Info("error getting ZoneInsight", "zone", newStream.Zone) + continue } zw.zones[zoneTenant{ From 11b1cc0d76a23f531a7d3339cd05c423a283364c Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 12:59:37 +0200 Subject: [PATCH 10/23] fix: change size of buffered channel Signed-off-by: Mike Beaumont --- pkg/kds/service/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index e782ad9bceb3..0d9c4cfb9b8b 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -181,7 +181,7 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.Internal, "could not store stream connection") } logger.Info("stored stream connection") - streamResult := make(chan error, 2) + streamResult := make(chan error, 1) go func() { for { resp, err := recv() From cd318c4d5ef5bc3371d0e0c16c542e5fac68aa10 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 13:55:35 +0200 Subject: [PATCH 11/23] chore: add explanation of keeping the first seen health check time Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index 6168e50a22db..b6f15f05c70b 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -94,6 +94,17 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { continue } + // We keep a record of the time we see in the ZoneInsight when the + // stream is opened. + // This is to prevent the zone from timing out on a poll + // where the last health check is still from a previous connect, so: + // a long time ago: zone CP disconnects, no more health checks are sent + // now: + // zone CP opens streams + // global CP gets ZoneOpenedStream + // global CP runs poll and see the last health check from "a long time ago" + // BAD: global CP kills streams + // zone CP health check arrives zw.zones[zoneTenant{ tenantID: newStream.TenantID, zone: newStream.Zone, From 05e25ae32cdcb90e35c265aa5dc369a40501f76b Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 14:19:53 +0200 Subject: [PATCH 12/23] refactor: add logging in zone CP when health check interval changes Signed-off-by: Mike Beaumont --- pkg/kds/mux/client.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 7af1d49b1502..2ed0787c64ed 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -299,14 +299,19 @@ func (c *client) startHealthCheck( log.Info("starting") go func() { - ticker := time.NewTicker(5 * time.Minute) + prevInterval := 5 * time.Minute + ticker := time.NewTicker(prevInterval) defer ticker.Stop() for { resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) if err != nil && !errors.Is(err, context.Canceled) { errorCh <- errors.Wrap(err, "zone health check request failed") - } else if resp.Interval.AsDuration() > 0 { - ticker.Reset(resp.Interval.AsDuration()) + } else if interval := resp.Interval.AsDuration(); interval > 0 { + if prevInterval != interval { + prevInterval = interval + log.Info("Global CP requested new healthcheck interval", "interval", interval) + } + ticker.Reset(interval) } select { From e1c0e51301729b4da5927ee6e441e5968ed2904a Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 14:21:30 +0200 Subject: [PATCH 13/23] docs: add to config comment Signed-off-by: Mike Beaumont --- pkg/config/multizone/kds.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index 1ef6b286e847..0c0b7fcbd322 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -105,8 +105,8 @@ func (k KdsClientConfig) Validate() error { } type ZoneHealthCheckConfig struct { - // PollInterval is the interval between the CP checking ZoneInsight for - // health check pings + // PollInterval is the interval between the global CP checking ZoneInsight for + // health check pings and interval between zone CP sending health check pings PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"` // Timeout is the time after the last health check that a zone counts as // no longer online From efd20fe0af985ef8001393f02a58b138f30d0576 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 14:30:20 +0200 Subject: [PATCH 14/23] feat: add metric to ZoneWatch Signed-off-by: Mike Beaumont --- pkg/kds/global/components.go | 9 +++++++-- pkg/kds/mux/zone_watch.go | 21 +++++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 6f76455bba43..118c3d72fb89 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -158,13 +158,18 @@ func Setup(rt runtime.Runtime) error { if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) { zwLog := kdsGlobalLog.WithName("zone-watch") - if err := rt.Add(component.NewResilientComponent(zwLog, mux.NewZoneWatch( + zw, err := mux.NewZoneWatch( zwLog, *rt.Config().Multizone.Global.KDS, + rt.Metrics(), rt.EventBus(), rt.ReadOnlyResourceManager(), rt.Extensions(), - ))); err != nil { + ) + if err != nil { + return errors.Wrap(err, "couldn't create ZoneWatch") + } + if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil { return err } } diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index b6f15f05c70b..b05633d04c89 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -5,8 +5,10 @@ import ( "time" "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" "github.com/kumahq/kuma/pkg/config/multizone" + "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/apis/system" "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" @@ -14,6 +16,7 @@ import ( "github.com/kumahq/kuma/pkg/events" "github.com/kumahq/kuma/pkg/kds/service" kuma_log "github.com/kumahq/kuma/pkg/log" + core_metrics "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/multitenant" ) @@ -29,16 +32,27 @@ type ZoneWatch struct { bus events.EventBus extensions context.Context rm manager.ReadOnlyResourceManager + summary prometheus.Summary zones map[zoneTenant]time.Time } func NewZoneWatch( log logr.Logger, cfg multizone.KdsServerConfig, + metrics prometheus.Registerer, bus events.EventBus, rm manager.ReadOnlyResourceManager, extensions context.Context, -) *ZoneWatch { +) (*ZoneWatch, error) { + summary := prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "component_zone_watch", + Help: "Summary of ZoneWatch component interval", + Objectives: core_metrics.DefaultObjectives, + }) + if err := metrics.Register(summary); err != nil { + return nil, err + } + return &ZoneWatch{ log: log, poll: cfg.ZoneHealthCheck.PollInterval.Duration, @@ -46,8 +60,9 @@ func NewZoneWatch( bus: bus, extensions: extensions, rm: rm, + summary: summary, zones: map[zoneTenant]time.Time{}, - } + }, nil } func (zw *ZoneWatch) Start(stop <-chan struct{}) error { @@ -63,6 +78,7 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { for { select { case <-timer.C: + start := core.Now() for zone, firstSeen := range zw.zones { ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) zoneInsight := system.NewZoneInsightResource() @@ -82,6 +98,7 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { delete(zw.zones, zone) } } + zw.summary.Observe(float64(core.Now().Sub(start).Milliseconds())) case e := <-connectionWatch.Recv(): newStream := e.(service.ZoneOpenedStream) From 8bd1cc758049c214d6d4328d94ca252bdafb9219 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Mon, 9 Oct 2023 14:48:35 +0200 Subject: [PATCH 15/23] chore: add logging to zone CP on health check send/error Signed-off-by: Mike Beaumont --- pkg/kds/mux/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 2ed0787c64ed..f9a86b0a3fb9 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -303,8 +303,10 @@ func (c *client) startHealthCheck( ticker := time.NewTicker(prevInterval) defer ticker.Stop() for { + log.Info("sending health check") resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) if err != nil && !errors.Is(err, context.Canceled) { + log.Error(err, "health check failed") errorCh <- errors.Wrap(err, "zone health check request failed") } else if interval := resp.Interval.AsDuration(); interval > 0 { if prevInterval != interval { From 0cf5af0a8140443942bc2fbbee0400a8f8d07094 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 13:36:26 +0200 Subject: [PATCH 16/23] chore: remove defunct struct Signed-off-by: Mike Beaumont --- pkg/kds/service/server.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 0d9c4cfb9b8b..58270d98bd0e 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -39,12 +39,6 @@ type StreamInterceptor interface { InterceptServerStream(stream grpc.ServerStream) error } -type ActiveStreams struct { - XDSConfig chan struct{} - Stats chan struct{} - Clusters chan struct{} -} - type GlobalKDSServiceServer struct { envoyAdminRPCs EnvoyAdminRPCs resManager manager.ResourceManager From 2f333e043b45ebd85a362b19a11ef1067ad42ac0 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 14:39:38 +0200 Subject: [PATCH 17/23] refactor: only pass necessary part of config Signed-off-by: Mike Beaumont --- pkg/kds/global/components.go | 2 +- pkg/kds/mux/zone_watch.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 118c3d72fb89..9e262553cebb 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -160,7 +160,7 @@ func Setup(rt runtime.Runtime) error { zwLog := kdsGlobalLog.WithName("zone-watch") zw, err := mux.NewZoneWatch( zwLog, - *rt.Config().Multizone.Global.KDS, + rt.Config().Multizone.Global.KDS.ZoneHealthCheck, rt.Metrics(), rt.EventBus(), rt.ReadOnlyResourceManager(), diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index b05633d04c89..861a27d2dbc6 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -38,7 +38,7 @@ type ZoneWatch struct { func NewZoneWatch( log logr.Logger, - cfg multizone.KdsServerConfig, + cfg multizone.ZoneHealthCheckConfig, metrics prometheus.Registerer, bus events.EventBus, rm manager.ReadOnlyResourceManager, @@ -55,8 +55,8 @@ func NewZoneWatch( return &ZoneWatch{ log: log, - poll: cfg.ZoneHealthCheck.PollInterval.Duration, - timeout: cfg.ZoneHealthCheck.Timeout.Duration, + poll: cfg.PollInterval.Duration, + timeout: cfg.Timeout.Duration, bus: bus, extensions: extensions, rm: rm, From 0934669130d4ce3ce98297f93a2836d998746a48 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 14:39:50 +0200 Subject: [PATCH 18/23] test(ZoneWatch): add suite Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch_test.go | 153 +++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 pkg/kds/mux/zone_watch_test.go diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go new file mode 100644 index 000000000000..6e40ba5c9088 --- /dev/null +++ b/pkg/kds/mux/zone_watch_test.go @@ -0,0 +1,153 @@ +package mux_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/types/known/timestamppb" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/config/multizone" + "github.com/kumahq/kuma/pkg/config/types" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + core_model "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds/mux" + "github.com/kumahq/kuma/pkg/kds/service" + core_metrics "github.com/kumahq/kuma/pkg/metrics" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +func sendHealthCheckPing(rm manager.ResourceManager, name string) { + zoneInsight := system.NewZoneInsightResource() + Expect(rm.Get( + context.Background(), + zoneInsight, + store.GetByKey(name, core_model.NoMesh), + )).To(Succeed()) + + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + Expect(rm.Update( + context.Background(), + zoneInsight, + )).To(Succeed()) +} + +const zone = "zone-1" + +var _ = Describe("ZoneWatch", func() { + var errCh chan error + + var rm manager.ResourceManager + var eventBus events.EventBus + var stop chan struct{} + var zoneWatch *mux.ZoneWatch + var timeouts events.Listener + + pollInterval := 100 * time.Millisecond + timeout := 5 * pollInterval + + BeforeEach(func() { + metrics, err := core_metrics.NewMetrics("") + Expect(err).ToNot(HaveOccurred()) + eventBus, err = events.NewEventBus(10, metrics) + Expect(err).NotTo(HaveOccurred()) + + cfg := multizone.ZoneHealthCheckConfig{ + PollInterval: types.Duration{Duration: pollInterval}, + Timeout: types.Duration{Duration: timeout}, + } + + zoneInsight := system.NewZoneInsightResource() + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + rm = manager.NewResourceManager(memory.NewStore()) + Expect(rm.Create( + context.Background(), + zoneInsight, + store.CreateByKey(zone, core_model.NoMesh), + )).To(Succeed()) + + log := core.Log.WithName("test") + zoneWatch, err = mux.NewZoneWatch( + log, + cfg, + metrics, + eventBus, + rm, + context.Background(), + ) + Expect(err).NotTo(HaveOccurred()) + + stop = make(chan struct{}) + + timeouts = eventBus.Subscribe(func(event events.Event) bool { + _, ok := event.(service.ZoneWentOffline) + return ok + }) + + errCh = make(chan error, 1) + + go func() { + errCh <- zoneWatch.Start(stop) + }() + + // wait for ZoneWatch to have subscribed to new zone events + time.Sleep(pollInterval) + }) + + AfterEach(func() { + select { + case <-errCh: + Fail("zone watch should not have stopped") + default: + } + close(stop) + timeouts.Close() + Eventually(errCh).Should(Receive(Succeed())) + }) + + // We know _best case_ the zone will register offline + // in timeout + pollInterval + zoneWentOfflineCheckTimeout := timeout + 2*pollInterval + + Describe("basic", FlakeAttempts(3), func() { + It("should timeout zones that stop sending a health check", func() { + sendHealthCheckPing(rm, zone) + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + }) + + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) + + sendHealthCheckPing(rm, zone) + + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ + TenantID: "", + Zone: zone, + }))) + }) + It("should not timeout zones that haven't sent a health check yet", func() { + sendHealthCheckPing(rm, zone) + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + }) + + Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) + }) + }) +}) From ff4ad607a9d8fb6144852bcc6c92402c3e95a24e Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 15:38:07 +0200 Subject: [PATCH 19/23] test(ZoneWatch): add additional time.Sleep to ensure conditions Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go index 6e40ba5c9088..c4e967edfde5 100644 --- a/pkg/kds/mux/zone_watch_test.go +++ b/pkg/kds/mux/zone_watch_test.go @@ -147,6 +147,12 @@ var _ = Describe("ZoneWatch", func() { Zone: zone, }) + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) + Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) }) }) From 8da0f5a0fa1cb9d7a5c74972c11c01cfcc1500f1 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 15:59:43 +0200 Subject: [PATCH 20/23] test(ZoneWatch): remove FlakeAttempts Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go index c4e967edfde5..d66d75a2c402 100644 --- a/pkg/kds/mux/zone_watch_test.go +++ b/pkg/kds/mux/zone_watch_test.go @@ -119,7 +119,7 @@ var _ = Describe("ZoneWatch", func() { // in timeout + pollInterval zoneWentOfflineCheckTimeout := timeout + 2*pollInterval - Describe("basic", FlakeAttempts(3), func() { + Describe("basic", func() { It("should timeout zones that stop sending a health check", func() { sendHealthCheckPing(rm, zone) eventBus.Send(service.ZoneOpenedStream{ From 0356948b8a2a4887fd4160338d9c133467378134 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 17:03:52 +0200 Subject: [PATCH 21/23] feat: treat opening the stream as a healthcheck Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index 861a27d2dbc6..43ede2a9ce49 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -89,8 +89,14 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { continue } + // It may be that we don't have a health check yet so we use the + // lastSeen time because we know the zone was connected at that + // point at least lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() - if lastHealthCheck.After(firstSeen) && time.Since(lastHealthCheck) > zw.timeout { + if firstSeen.After(lastHealthCheck) { + lastHealthCheck = firstSeen + } + if time.Since(lastHealthCheck) > zw.timeout { zw.bus.Send(service.ZoneWentOffline{ Zone: zone.zone, TenantID: zone.tenantID, From 5749df95610e8aaee3759ef492d68ac7372064b5 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 17:04:27 +0200 Subject: [PATCH 22/23] test(ZoneWatch): add test that updating ZoneInsight keeps a zone from timing out Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_watch_test.go | 70 +++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go index d66d75a2c402..56afe64c4f8c 100644 --- a/pkg/kds/mux/zone_watch_test.go +++ b/pkg/kds/mux/zone_watch_test.go @@ -119,41 +119,49 @@ var _ = Describe("ZoneWatch", func() { // in timeout + pollInterval zoneWentOfflineCheckTimeout := timeout + 2*pollInterval - Describe("basic", func() { - It("should timeout zones that stop sending a health check", func() { - sendHealthCheckPing(rm, zone) - eventBus.Send(service.ZoneOpenedStream{ - TenantID: "", - Zone: zone, - }) + It("should timeout zones that stop sending a health check", func() { + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + }) - // wait for opened stream to be registered - // in real conditions the interval will be large enough - // that these events will almost certainly be handled - // by the ZoneWatch loop between polls and before the timeout - time.Sleep(pollInterval) + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) - sendHealthCheckPing(rm, zone) + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ + TenantID: "", + Zone: zone, + }))) - Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ - TenantID: "", - Zone: zone, - }))) + Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) + }) + It("shouldn't timeout as long as ZoneInsight is updated", func() { + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, }) - It("should not timeout zones that haven't sent a health check yet", func() { + + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) + + // Send a health check and block for a poll interval and make sure + // nothing has been received + // do this until we know the timeout would have come if weren't sending + // health checks + Consistently(func(g Gomega) { sendHealthCheckPing(rm, zone) - eventBus.Send(service.ZoneOpenedStream{ - TenantID: "", - Zone: zone, - }) - - // wait for opened stream to be registered - // in real conditions the interval will be large enough - // that these events will almost certainly be handled - // by the ZoneWatch loop between polls and before the timeout - time.Sleep(pollInterval) - - Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) - }) + g.Consistently(timeouts.Recv(), pollInterval).ShouldNot(Receive()) + }, 3*timeout).Should(Succeed()) + + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ + TenantID: "", + Zone: zone, + }))) }) }) From 61ed009e44e42163deab8e6e06eaefdb66df3b85 Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 17:14:52 +0200 Subject: [PATCH 23/23] fix(kds): check feature before killing because of zone health check Signed-off-by: Mike Beaumont --- pkg/kds/mux/zone_sync.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index f61f9520fa8a..fc25b76f121f 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -2,15 +2,18 @@ package mux import ( "context" + "slices" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/kds/util" "github.com/kumahq/kuma/pkg/log" @@ -76,8 +79,7 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService } } - tenantID, _ := multitenant.TenantFromCtx(stream.Context()) - shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone) + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) defer shouldDisconnectStream.Close() processingErrorsCh := make(chan error) @@ -111,8 +113,7 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService } } - tenantID, _ := multitenant.TenantFromCtx(stream.Context()) - shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone) + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) defer shouldDisconnectStream.Close() processingErrorsCh := make(chan error) @@ -133,12 +134,20 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService } } -func (g *KDSSyncServiceServer) watchZoneHealthCheck(tenantID, zone string) events.Listener { - shouldDisconnectStream := g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(service.ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone - }) - g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) +func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener { + tenantID, _ := multitenant.TenantFromCtx(streamContext) + md, _ := metadata.FromIncomingContext(streamContext) + + shouldDisconnectStream := events.NewNeverListener() + + features := md.Get(kds.FeaturesMetadataKey) + if slices.Contains(features, kds.FeatureZonePingHealth) { + shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(service.ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + } return shouldDisconnectStream }