Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): introduce zone health checks #7821

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1dbb3a8
feat(kuma-cp): call zone HealthCheck periodically from zone
michaelbeaumont Oct 5, 2023
9230b77
feat(kuma-cp): kill zone connections when health check fails
michaelbeaumont Sep 28, 2023
3c67e80
feat(kuma-cp): only watch zone health if feature present
michaelbeaumont Oct 5, 2023
09d2ec0
feat(kuma-cp): add config for global zone health check logic
michaelbeaumont Oct 6, 2023
81bc083
feat(kuma-cp): use health check interval from global
michaelbeaumont Oct 6, 2023
0e154d4
refactor: remove unnecessary goroutine
michaelbeaumont Oct 9, 2023
b3a52d0
fix: properly validate zone health check config
michaelbeaumont Oct 9, 2023
be2a53b
refactor: add logging on health check failed disconnect
michaelbeaumont Oct 9, 2023
3b48b85
refactor: add logging with tenant id on error and add continue
michaelbeaumont Oct 9, 2023
11b1cc0
fix: change size of buffered channel
michaelbeaumont Oct 9, 2023
cd318c4
chore: add explanation of keeping the first seen health check time
michaelbeaumont Oct 9, 2023
05e25ae
refactor: add logging in zone CP when health check interval changes
michaelbeaumont Oct 9, 2023
e1c0e51
docs: add to config comment
michaelbeaumont Oct 9, 2023
efd20fe
feat: add metric to ZoneWatch
michaelbeaumont Oct 9, 2023
8bd1cc7
chore: add logging to zone CP on health check send/error
michaelbeaumont Oct 9, 2023
0cf5af0
chore: remove defunct struct
michaelbeaumont Oct 10, 2023
2f333e0
refactor: only pass necessary part of config
michaelbeaumont Oct 10, 2023
0934669
test(ZoneWatch): add suite
michaelbeaumont Oct 10, 2023
fd543c8
Merge remote-tracking branch 'upstream/master' into feat/do-zone-heal…
michaelbeaumont Oct 10, 2023
ff4ad60
test(ZoneWatch): add additional time.Sleep to ensure conditions
michaelbeaumont Oct 10, 2023
8da0f5a
test(ZoneWatch): remove FlakeAttempts
michaelbeaumont Oct 10, 2023
0356948
feat: treat opening the stream as a healthcheck
michaelbeaumont Oct 10, 2023
5749df9
test(ZoneWatch): add test that updating ZoneInsight keeps a zone from…
michaelbeaumont Oct 10, 2023
61ed009
fix(kds): check feature before killing because of zone health check
michaelbeaumont Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 151 additions & 131 deletions api/mesh/v1alpha1/kds.pb.go

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions api/mesh/v1alpha1/kds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "envoy/service/discovery/v3/discovery.proto";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";

service KumaDiscoveryService {
rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest)
Expand All @@ -22,10 +23,12 @@ message KumaResource {
google.protobuf.Any spec = 2;
}

message ZoneHealthCheckRequest {
}
message ZoneHealthCheckRequest {}

message ZoneHealthCheckResponse {
// The the interval that the global control plane
// expects between health check pings
google.protobuf.Duration interval = 1;
}

service GlobalKDSService {
Expand Down
8 changes: 8 additions & 0 deletions docs/generated/raw/protos/ZoneHealthCheckResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
"$ref": "#/definitions/ZoneHealthCheckResponse",
"definitions": {
"ZoneHealthCheckResponse": {
"properties": {
"interval": {
"pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$",
"type": "string",
"description": "The the interval that the global control plane expects between health check pings",
"format": "regex"
}
},
"additionalProperties": true,
"type": "object",
"title": "Zone Health Check Response"
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Multizone.Global.KDS.MsgSendTimeout.Duration).To(Equal(10 * time.Second))
Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue())
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second))
Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685"))
Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1"))
Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa"))
Expand Down Expand Up @@ -565,6 +567,9 @@ multizone:
msgSendTimeout: 10s
nackBackoff: 11s
disableSOTW: true
zoneHealthCheck:
pollInterval: 11s
timeout: 110s
zone:
globalAddress: "grpc://1.1.1.1:5685"
name: "zone-1"
Expand Down Expand Up @@ -862,6 +867,8 @@ tracing:
"KUMA_MULTIZONE_GLOBAL_KDS_MSG_SEND_TIMEOUT": "10s",
"KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s",
"KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685",
"KUMA_MULTIZONE_ZONE_NAME": "zone-1",
"KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa",
Expand Down
24 changes: 24 additions & 0 deletions pkg/config/multizone/kds.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type KdsServerConfig struct {
NackBackoff config_types.Duration `json:"nackBackoff" envconfig:"kuma_multizone_global_kds_nack_backoff"`
// DisableSOTW if true doesn't expose SOTW version of KDS. Default: false
DisableSOTW bool `json:"disableSOTW" envconfig:"kuma_multizone_global_kds_disable_sotw"`
// ZoneHealthCheck holds config for ensuring zones are online
ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"`
}

var _ config.Config = &KdsServerConfig{}
Expand Down Expand Up @@ -70,6 +72,9 @@ func (c *KdsServerConfig) Validate() error {
if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil {
errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error()))
}
if err := c.ZoneHealthCheck.Validate(); err != nil {
errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config"))
}
return errs
}

Expand Down Expand Up @@ -98,3 +103,22 @@ func (k KdsClientConfig) Sanitize() {
func (k KdsClientConfig) Validate() error {
return nil
}

type ZoneHealthCheckConfig struct {
// PollInterval is the interval between the global CP checking ZoneInsight for
// health check pings and interval between zone CP sending health check pings
PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"`
// Timeout is the time after the last health check that a zone counts as
// no longer online
Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"`
}

func (c ZoneHealthCheckConfig) Sanitize() {
}

func (c ZoneHealthCheckConfig) Validate() error {
if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) {
return errors.New("timeout and pollInterval must both be either set or unset")
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ type Listener interface {
Close()
}

func NewNeverListener() Listener {
return &neverRecvListener{}
}

type neverRecvListener struct{}

func (*neverRecvListener) Recv() <-chan Event {
return nil
}

func (*neverRecvListener) Close() {
}

type Predicate = func(event Event) bool

type Emitter interface {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kds/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@ func (f Features) HasFeature(feature string) bool {
return f[feature]
}

const FeaturesMetadataKey string = "features"

// FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane.
const FeatureZoneToken string = "zone-token"

// FeatureZonePingHealth means that the zone control plane sends pings to the
// global control plane to indicate it's still running.
const FeatureZonePingHealth string = "zone-ping-health"
22 changes: 22 additions & 0 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -154,6 +155,24 @@ func Setup(rt runtime.Runtime) error {
for _, filter := range rt.KDSContext().GlobalServerFiltersV2 {
streamInterceptors = append(streamInterceptors, filter)
}

if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
zwLog := kdsGlobalLog.WithName("zone-watch")
zw, err := mux.NewZoneWatch(
zwLog,
*rt.Config().Multizone.Global.KDS,
rt.Metrics(),
rt.EventBus(),
rt.ReadOnlyResourceManager(),
rt.Extensions(),
)
if err != nil {
return errors.Wrap(err, "couldn't create ZoneWatch")
}
if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
return err
}
}
return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer(
onSessionStarted,
rt.KDSContext().GlobalServerFilters,
Expand All @@ -167,12 +186,15 @@ func Setup(rt runtime.Runtime) error {
streamInterceptors,
rt.Extensions(),
rt.Config().Store.Upsert,
rt.EventBus(),
rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration,
),
mux.NewKDSSyncServiceServer(
onGlobalToZoneSyncConnect,
onZoneToGlobalSyncConnect,
rt.KDSContext().GlobalServerFiltersV2,
rt.Extensions(),
rt.EventBus(),
),
)))
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"net/url"
"os"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/kds"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/version"
Expand Down Expand Up @@ -98,12 +100,15 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx,
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
kds.FeaturesMetadataKey, kds.FeatureZonePingHealth,
))
defer cancel()

log := muxClientLog.WithValues("client-id", c.clientID)
errorCh := make(chan error)

c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh)

go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh)
go c.startStats(withKDSCtx, log, conn, stop, errorCh)
go c.startClusters(withKDSCtx, log, conn, stop, errorCh)
Expand Down Expand Up @@ -282,6 +287,46 @@ func (c *client) startClusters(
c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh)
}

func (c *client) startHealthCheck(
ctx context.Context,
log logr.Logger,
conn *grpc.ClientConn,
stop <-chan struct{},
errorCh chan error,
) {
client := mesh_proto.NewGlobalKDSServiceClient(conn)
log = log.WithValues("rpc", "healthcheck")
log.Info("starting")

go func() {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
prevInterval := 5 * time.Minute
ticker := time.NewTicker(prevInterval)
defer ticker.Stop()
for {
log.Info("sending health check")
resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "health check failed")
errorCh <- errors.Wrap(err, "zone health check request failed")
} else if interval := resp.Interval.AsDuration(); interval > 0 {
if prevInterval != interval {
prevInterval = interval
log.Info("Global CP requested new healthcheck interval", "interval", interval)
}
ticker.Reset(interval)
}

select {
case <-ticker.C:
continue
case <-stop:
log.Info("stopping")
return
}
}
}()
}

func (c *client) handleProcessingErrors(
stream grpc.ClientStream,
log logr.Logger,
Expand Down
40 changes: 36 additions & 4 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/events"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/kds/util"
"github.com/kumahq/kuma/pkg/log"
"github.com/kumahq/kuma/pkg/multitenant"
)

type FilterV2 interface {
Expand All @@ -38,6 +41,7 @@ type KDSSyncServiceServer struct {
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc
filters []FilterV2
extensions context.Context
eventBus events.EventBus
mesh_proto.UnimplementedKDSSyncServiceServer
}

Expand All @@ -46,32 +50,42 @@ func NewKDSSyncServiceServer(
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc,
filters []FilterV2,
extensions context.Context,
eventBus events.EventBus,
) *KDSSyncServiceServer {
return &KDSSyncServiceServer{
globalToZoneCb: globalToZoneCb,
zoneToGlobalCb: zoneToGlobalCb,
filters: filters,
extensions: extensions,
eventBus: eventBus,
}
}

var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{}

func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", clientID)
logger = logger.WithValues("clientID", zone)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

tenantID, _ := multitenant.TenantFromCtx(stream.Context())
shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("GlobalToZoneSync rpc stream stopped")
return nil
Expand All @@ -86,19 +100,27 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService

func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", clientID)
logger = logger.WithValues("clientID", zone)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

tenantID, _ := multitenant.TenantFromCtx(stream.Context())
shouldDisconnectStream := g.watchZoneHealthCheck(tenantID, zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("ZoneToGlobalSync rpc stream stopped")
return nil
Expand All @@ -110,3 +132,13 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService
return status.Error(codes.Internal, "stream failed")
}
}

func (g *KDSSyncServiceServer) watchZoneHealthCheck(tenantID, zone string) events.Listener {
shouldDisconnectStream := g.eventBus.Subscribe(func(e events.Event) bool {
disconnectEvent, ok := e.(service.ZoneWentOffline)
return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone
})
g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID})

return shouldDisconnectStream
}
Loading