diff --git a/app/kuma-dp/cmd/run.go b/app/kuma-dp/cmd/run.go index 5956a6239358..4e9cce3aa452 100644 --- a/app/kuma-dp/cmd/run.go +++ b/app/kuma-dp/cmd/run.go @@ -1,6 +1,7 @@ package cmd import ( + "context" "os" "path/filepath" "time" @@ -144,8 +145,10 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command { }() } - shouldQuit := make(chan struct{}) + // gracefulCtx indicate that the process received a signal to shutdown gracefulCtx, ctx := opts.SetupSignalHandler() + // componentCtx indicates that components should shutdown (you can use cancel to trigger the shutdown of all components) + componentCtx, cancelComponents := context.WithCancel(gracefulCtx) accessLogSocketPath := core_xds.AccessLogSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh) components := []component.Component{ tokenComp, @@ -160,17 +163,15 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command { Dataplane: rest.From.Resource(proxyResource), Stdout: cmd.OutOrStdout(), Stderr: cmd.OutOrStderr(), - Quit: shouldQuit, + OnFinish: cancelComponents, } - if cfg.DNS.Enabled && - cfg.Dataplane.ProxyType != string(mesh_proto.IngressProxyType) && - cfg.Dataplane.ProxyType != string(mesh_proto.EgressProxyType) { + if cfg.DNS.Enabled && !cfg.Dataplane.IsZoneProxy() { dnsOpts := &dnsserver.Opts{ - Config: *cfg, - Stdout: cmd.OutOrStdout(), - Stderr: cmd.OutOrStderr(), - Quit: shouldQuit, + Config: *cfg, + Stdout: cmd.OutOrStdout(), + Stderr: cmd.OutOrStderr(), + OnFinish: cancelComponents, } dnsServer, err := dnsserver.New(dnsOpts) @@ -225,43 +226,44 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command { } opts.AdminPort = bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue() - dataplane, err := envoy.New(opts) + envoyComponent, err := envoy.New(opts) if err != nil { return err } + components = append(components, envoyComponent) - components = append(components, dataplane) metricsServer := metrics.New( metricsSocketPath, getApplicationsToScrape(kumaSidecarConfiguration, bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue()), kumaSidecarConfiguration.Networking.IsUsingTransparentProxy, ) components = append(components, metricsServer) - if err := rootCtx.ComponentManager.Add(components...); err != nil { return err } + stopComponents := make(chan struct{}) go func() { - <-gracefulCtx.Done() - runLog.Info("Kuma DP caught an exit signal. Draining Envoy connections") - if err := dataplane.DrainConnections(); err != nil { - runLog.Error(err, "could not drain connections") - } else { - runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime) - select { - case <-time.After(cfg.Dataplane.DrainTime.Duration): - case <-ctx.Done(): + select { + case <-gracefulCtx.Done(): + runLog.Info("Kuma DP caught an exit signal. Draining Envoy connections") + if err := envoyComponent.DrainConnections(); err != nil { + runLog.Error(err, "could not drain connections") + } else { + runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime) + select { + case <-time.After(cfg.Dataplane.DrainTime.Duration): + case <-ctx.Done(): + } } + case <-componentCtx.Done(): } runLog.Info("stopping all Kuma DP components") - if shouldQuit != nil { - close(shouldQuit) - } + close(stopComponents) }() runLog.Info("starting Kuma DP", "version", kuma_version.Build.Version) - if err := rootCtx.ComponentManager.Start(shouldQuit); err != nil { + if err := rootCtx.ComponentManager.Start(stopComponents); err != nil { runLog.Error(err, "error while running Kuma DP") return err } diff --git a/app/kuma-dp/pkg/dataplane/command/base_command.go b/app/kuma-dp/pkg/dataplane/command/base_command.go new file mode 100644 index 000000000000..761c623deaee --- /dev/null +++ b/app/kuma-dp/pkg/dataplane/command/base_command.go @@ -0,0 +1,27 @@ +package command + +import ( + "context" + "io" + "os/exec" + "syscall" + "time" +) + +func baseBuildCommand( + ctx context.Context, + stdout io.Writer, + stderr io.Writer, + name string, + args ...string, +) *exec.Cmd { + command := exec.CommandContext(ctx, name, args...) + command.Stdout = stdout + command.Stderr = stderr + command.Cancel = func() error { + return command.Process.Signal(syscall.SIGTERM) + } + command.WaitDelay = time.Second * 5 + + return command +} diff --git a/app/kuma-dp/pkg/dataplane/command/build_command.go b/app/kuma-dp/pkg/dataplane/command/build_command_darwin.go similarity index 74% rename from app/kuma-dp/pkg/dataplane/command/build_command.go rename to app/kuma-dp/pkg/dataplane/command/build_command_darwin.go index 4161533f8aee..e4e5ead6ef06 100644 --- a/app/kuma-dp/pkg/dataplane/command/build_command.go +++ b/app/kuma-dp/pkg/dataplane/command/build_command_darwin.go @@ -1,4 +1,4 @@ -//go:build !linux && !windows +//go:build darwin package command @@ -16,14 +16,11 @@ func BuildCommand( name string, args ...string, ) *exec.Cmd { - command := exec.CommandContext(ctx, name, args...) - command.Stdout = stdout - command.Stderr = stderr + command := baseBuildCommand(ctx, stdout, stderr, name, args...) command.SysProcAttr = &syscall.SysProcAttr{ // Set those attributes so the new process won't receive the signals from a parent automatically. Setpgid: true, Pgid: 0, } - return command } diff --git a/app/kuma-dp/pkg/dataplane/command/build_command_linux.go b/app/kuma-dp/pkg/dataplane/command/build_command_linux.go index 2858f0593952..dd568b7f1e04 100644 --- a/app/kuma-dp/pkg/dataplane/command/build_command_linux.go +++ b/app/kuma-dp/pkg/dataplane/command/build_command_linux.go @@ -16,15 +16,12 @@ func BuildCommand( name string, args ...string, ) *exec.Cmd { - command := exec.CommandContext(ctx, name, args...) - command.Stdout = stdout - command.Stderr = stderr + command := baseBuildCommand(ctx, stdout, stderr, name, args...) command.SysProcAttr = &syscall.SysProcAttr{ Pdeathsig: syscall.SIGKILL, // Set those attributes so the new process won't receive the signals from a parent automatically. Setpgid: true, Pgid: 0, } - return command } diff --git a/app/kuma-dp/pkg/dataplane/command/build_command_windows.go b/app/kuma-dp/pkg/dataplane/command/build_command_windows.go index b9c9365e0a67..65b56f3a1b64 100644 --- a/app/kuma-dp/pkg/dataplane/command/build_command_windows.go +++ b/app/kuma-dp/pkg/dataplane/command/build_command_windows.go @@ -15,9 +15,7 @@ func BuildCommand( name string, args ...string, ) *exec.Cmd { - command := exec.CommandContext(ctx, name, args...) - command.Stdout = stdout - command.Stderr = stderr + command := baseBuildCommand(ctx, stdout, stderr, name, args) // todo(jakubdyszkiewicz): do not propagate SIGTERM return command diff --git a/app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go b/app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go index 381a31a59f31..d7aaca1d55fe 100644 --- a/app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go +++ b/app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go @@ -30,10 +30,10 @@ type DNSServer struct { var _ component.GracefulComponent = &DNSServer{} type Opts struct { - Config kuma_dp.Config - Stdout io.Writer - Stderr io.Writer - Quit chan struct{} + Config kuma_dp.Config + Stdout io.Writer + Stderr io.Writer + OnFinish context.CancelFunc } func lookupDNSServerPath(configuredPath string) (string, error) { @@ -47,8 +47,10 @@ func lookupDNSServerPath(configuredPath string) (string, error) { func New(opts *Opts) (*DNSServer, error) { dnsServerPath, err := lookupDNSServerPath(opts.Config.DNS.CoreDNSBinaryPath) if err != nil { - runLog.Error(err, "could not find the DNS Server executable in your path") - return nil, err + return nil, errors.Wrapf(err, "could not find coreDNS executable") + } + if opts.OnFinish == nil { + opts.OnFinish = func() {} } return &DNSServer{opts: opts, path: dnsServerPath}, nil @@ -59,7 +61,7 @@ func (s *DNSServer) GetVersion() (string, error) { command := exec.Command(path, "--version") output, err := command.Output() if err != nil { - return "", err + return "", errors.Wrapf(err, "failed to execute coreDNS at path %s", path) } match := regexp.MustCompile(`CoreDNS-(.*)`).FindSubmatch(output) @@ -76,6 +78,12 @@ func (s *DNSServer) NeedLeaderElection() bool { func (s *DNSServer) Start(stop <-chan struct{}) error { s.wg.Add(1) + // Component should only be considered done after CoreDNS exists. + // Otherwise, we may not propagate SIGTERM on time. + defer func() { + s.wg.Done() + s.opts.OnFinish() + }() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -129,33 +137,18 @@ func (s *DNSServer) Start(stop <-chan struct{}) error { return err } - done := make(chan error, 1) - go func() { - done <- command.Wait() - // Component should only be considered done after CoreDNS exists. - // Otherwise, we may not propagate SIGTERM on time. - s.wg.Done() - }() - - select { - case <-stop: + <-stop runLog.Info("stopping DNS Server") cancel() - return nil - case err := <-done: - if err != nil { - runLog.Error(err, "DNS Server terminated with an error") - } else { - runLog.Info("DNS Server terminated successfully") - } - - if s.opts.Quit != nil { - close(s.opts.Quit) - } - + }() + err = command.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + runLog.Error(err, "DNS Server terminated with an error") return err } + runLog.Info("DNS Server terminated successfully") + return nil } func (s *DNSServer) WaitForDone() { diff --git a/app/kuma-dp/pkg/dataplane/envoy/envoy.go b/app/kuma-dp/pkg/dataplane/envoy/envoy.go index cacf4835a100..5279479d7a65 100644 --- a/app/kuma-dp/pkg/dataplane/envoy/envoy.go +++ b/app/kuma-dp/pkg/dataplane/envoy/envoy.go @@ -48,13 +48,15 @@ type Opts struct { Dataplane rest.Resource Stdout io.Writer Stderr io.Writer - Quit chan struct{} + OnFinish func() } func New(opts Opts) (*Envoy, error) { if _, err := lookupEnvoyPath(opts.Config.DataplaneRuntime.BinaryPath); err != nil { - runLog.Error(err, "could not find the envoy executable in your path") - return nil, err + return nil, errors.Wrap(err, "could not find envoy executable") + } + if opts.OnFinish == nil { + opts.OnFinish = func() {} } return &Envoy{opts: opts}, nil } @@ -87,6 +89,12 @@ func lookupEnvoyPath(configuredPath string) (string, error) { func (e *Envoy) Start(stop <-chan struct{}) error { e.wg.Add(1) + // Component should only be considered done after Envoy exists. + // Otherwise, we may not propagate SIGTERM on time. + defer func() { + e.wg.Done() + e.opts.OnFinish() + }() configFile, err := GenerateBootstrapFile(e.opts.Config.DataplaneRuntime, e.opts.BootstrapConfig) if err != nil { @@ -144,31 +152,18 @@ func (e *Envoy) Start(stop <-chan struct{}) error { runLog.Error(err, "envoy executable failed", "path", resolvedPath, "arguments", args) return err } - done := make(chan error, 1) go func() { - done <- command.Wait() - // Component should only be considered done after Envoy exists. - // Otherwise, we may not propagate SIGTERM on time. - e.wg.Done() - }() - - select { - case <-stop: + <-stop runLog.Info("stopping Envoy") cancel() - return nil - case err := <-done: - if err != nil { - runLog.Error(err, "Envoy terminated with an error") - } else { - runLog.Info("Envoy terminated successfully") - } - if e.opts.Quit != nil { - close(e.opts.Quit) - } - + }() + err = command.Wait() + if err != nil && !errors.Is(err, context.Canceled) { + runLog.Error(err, "Envoy terminated with an error") return err } + runLog.Info("Envoy terminated successfully") + return nil } func (e *Envoy) WaitForDone() { diff --git a/pkg/config/app/kuma-dp/config.go b/pkg/config/app/kuma-dp/config.go index 90124638aefa..b515dd3f52db 100644 --- a/pkg/config/app/kuma-dp/config.go +++ b/pkg/config/app/kuma-dp/config.go @@ -153,6 +153,11 @@ func (d *Dataplane) PostProcess() error { return nil } +func (d *Dataplane) IsZoneProxy() bool { + return d.ProxyType == string(mesh_proto.IngressProxyType) || + d.ProxyType == string(mesh_proto.EgressProxyType) +} + func validateMeshOrName[V ~string](typ string, value V) error { if value == "" { return errors.Errorf("%s must be non-empty", typ) @@ -345,7 +350,7 @@ type DNS struct { CoreDNSConfigTemplatePath string `json:"coreDnsConfigTemplatePath,omitempty" envconfig:"kuma_dns_core_dns_config_template_path"` // Dir to store auto-generated DNS Server config in. ConfigDir string `json:"configDir,omitempty" envconfig:"kuma_dns_config_dir"` - // Port where Prometheus stats will be exposed for the DNS Server + // PrometheusPort where Prometheus stats will be exposed for the DNS Server PrometheusPort uint32 `json:"prometheusPort,omitempty" envconfig:"kuma_dns_prometheus_port"` } diff --git a/pkg/core/runtime/component/component.go b/pkg/core/runtime/component/component.go index a80fc732742f..b86cf353bdcd 100644 --- a/pkg/core/runtime/component/component.go +++ b/pkg/core/runtime/component/component.go @@ -108,29 +108,29 @@ func (cm *manager) Add(c ...Component) error { return nil } -func (cm *manager) waitForDone() { - // limitation: waitForDone does not wait for components added after Start() is called. - // This is ok for now, because it's used only in context of Kuma DP where we are not adding components in runtime. - for _, c := range cm.components { - if gc, ok := c.(GracefulComponent); ok { - gc.WaitForDone() - } - } -} - func (cm *manager) Start(stop <-chan struct{}) error { errCh := make(chan error) cm.Lock() - cm.startNonLeaderComponents(stop, errCh) + internalDone := make(chan struct{}) + cm.startNonLeaderComponents(internalDone, errCh) cm.started = true - cm.stopCh = stop + cm.stopCh = internalDone cm.errCh = errCh cm.Unlock() // this has to be called outside of lock because it can be leader at any time, and it locks in leader callbacks. - cm.startLeaderComponents(stop, errCh) - - defer cm.waitForDone() + cm.startLeaderComponents(internalDone, errCh) + + defer func() { + close(internalDone) + // limitation: waitForDone does not wait for components added after Start() is called. + // This is ok for now, because it's used only in context of Kuma DP where we are not adding components in runtime. + for _, c := range cm.components { + if gc, ok := c.(GracefulComponent); ok { + gc.WaitForDone() + } + } + }() select { case <-stop: return nil diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 06ae68778572..23d71a2f372b 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -107,16 +107,16 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { log := muxClientLog.WithValues("client-id", c.clientID) errorCh := make(chan error) - c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh) + go c.startHealthCheck(withKDSCtx, log, conn, errorCh) - go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh) - go c.startStats(withKDSCtx, log, conn, stop, errorCh) - go c.startClusters(withKDSCtx, log, conn, stop, errorCh) + go c.startXDSConfigs(withKDSCtx, log, conn, errorCh) + go c.startStats(withKDSCtx, log, conn, errorCh) + go c.startClusters(withKDSCtx, log, conn, errorCh) if c.experimantalConfig.KDSDeltaEnabled { - go c.startGlobalToZoneSync(withKDSCtx, log, conn, stop, errorCh) - go c.startZoneToGlobalSync(withKDSCtx, log, conn, stop, errorCh) + go c.startGlobalToZoneSync(withKDSCtx, log, conn, errorCh) + go c.startZoneToGlobalSync(withKDSCtx, log, conn, errorCh) } else { - go c.startKDSMultiplex(withKDSCtx, log, conn, stop, errorCh) + go c.startKDSMultiplex(withKDSCtx, log, conn, errorCh) } select { @@ -124,11 +124,12 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { cancel() return errs case err = <-errorCh: + cancel() return err } } -func (c *client) startKDSMultiplex(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, stop <-chan struct{}, errorCh chan error) { +func (c *client) startKDSMultiplex(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) { muxClient := mesh_proto.NewMultiplexServiceClient(conn) log.Info("initializing Kuma Discovery Service (KDS) stream for global-zone sync of resources") stream, err := muxClient.StreamMessage(ctx) @@ -144,26 +145,21 @@ func (c *client) startKDSMultiplex(ctx context.Context, log logr.Logger, conn *g errorCh <- err return } - select { - case <-stop: - log.Info("KDS stream stopped", "reason", err) - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - err = <-session.Error() - errorCh <- err - case err = <-session.Error(): + err = <-session.Error() + if errors.Is(err, context.Canceled) { + log.Info("KDS stream shutting down") + } else { log.Error(err, "KDS stream failed prematurely, will restart in background") - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - errorCh <- err - return } + if err := stream.CloseSend(); err != nil { + log.Error(err, "CloseSend returned an error") + } + errorCh <- err } -func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, stop <-chan struct{}, errorCh chan error) { +func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) { kdsClient := mesh_proto.NewKDSSyncServiceClient(conn) + log = log.WithValues("rpc", "global-to-zone") log.Info("initializing Kuma Discovery Service (KDS) stream for global to zone sync of resources with delta xDS") stream, err := kdsClient.GlobalToZoneSync(ctx) if err != nil { @@ -172,29 +168,12 @@ func (c *client) startGlobalToZoneSync(ctx context.Context, log logr.Logger, con } processingErrorsCh := make(chan error) c.globalToZoneCb.OnGlobalToZoneSyncStarted(stream, processingErrorsCh) - select { - case <-stop: - log.Info("Global to Zone Sync rpc stream stopped") - if err := stream.CloseSend(); err != nil { - errorCh <- errors.Wrap(err, "CloseSend returned an error") - } - case err := <-processingErrorsCh: - if status.Code(err) == codes.Unimplemented { - log.Error(err, "Global to Zone Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") - // backwards compatibility. Do not rethrow error, so Admin RPC can still operate. - return - } - log.Error(err, "Global to Zone Sync rpc stream failed, will restart in background") - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - errorCh <- err - return - } + c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh) } -func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, stop <-chan struct{}, errorCh chan error) { +func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, conn *grpc.ClientConn, errorCh chan error) { kdsClient := mesh_proto.NewKDSSyncServiceClient(conn) + log = log.WithValues("rpc", "zone-to-global") log.Info("initializing Kuma Discovery Service (KDS) stream for zone to global sync of resources with delta xDS") stream, err := kdsClient.ZoneToGlobalSync(ctx) if err != nil { @@ -203,32 +182,13 @@ func (c *client) startZoneToGlobalSync(ctx context.Context, log logr.Logger, con } processingErrorsCh := make(chan error) c.zoneToGlobalCb.OnZoneToGlobalSyncStarted(stream, processingErrorsCh) - select { - case <-stop: - log.Info("Zone to Global Sync rpc stream stopped") - if err := stream.CloseSend(); err != nil { - errorCh <- errors.Wrap(err, "CloseSend returned an error") - } - case err := <-processingErrorsCh: - if status.Code(err) == codes.Unimplemented { - log.Error(err, "Zone to Global Sync rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") - // backwards compatibility. Do not rethrow error, so Admin RPC can still operate. - return - } - log.Error(err, "Zone to Global Sync rpc stream failed, will restart in background") - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - errorCh <- err - return - } + c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh) } func (c *client) startXDSConfigs( ctx context.Context, log logr.Logger, conn *grpc.ClientConn, - stop <-chan struct{}, errorCh chan error, ) { client := mesh_proto.NewGlobalKDSServiceClient(conn) @@ -242,14 +202,13 @@ func (c *client) startXDSConfigs( processingErrorsCh := make(chan error) go c.envoyAdminProcessor.StartProcessingXDSConfigs(stream, processingErrorsCh) - c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) + c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh) } func (c *client) startStats( ctx context.Context, log logr.Logger, conn *grpc.ClientConn, - stop <-chan struct{}, errorCh chan error, ) { client := mesh_proto.NewGlobalKDSServiceClient(conn) @@ -263,14 +222,13 @@ func (c *client) startStats( processingErrorsCh := make(chan error) go c.envoyAdminProcessor.StartProcessingStats(stream, processingErrorsCh) - c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) + c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh) } func (c *client) startClusters( ctx context.Context, log logr.Logger, conn *grpc.ClientConn, - stop <-chan struct{}, errorCh chan error, ) { client := mesh_proto.NewGlobalKDSServiceClient(conn) @@ -284,79 +242,75 @@ func (c *client) startClusters( processingErrorsCh := make(chan error) go c.envoyAdminProcessor.StartProcessingClusters(stream, processingErrorsCh) - c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) + c.handleProcessingErrors(stream, log, processingErrorsCh, errorCh) } func (c *client) startHealthCheck( ctx context.Context, log logr.Logger, conn *grpc.ClientConn, - stop <-chan struct{}, errorCh chan error, ) { client := mesh_proto.NewGlobalKDSServiceClient(conn) log = log.WithValues("rpc", "healthcheck") log.Info("starting") - go func() { - prevInterval := 5 * time.Minute - ticker := time.NewTicker(prevInterval) - defer ticker.Stop() - for { - log.Info("sending health check") - resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) - if err != nil && !errors.Is(err, context.Canceled) { - if status.Code(err) == codes.Unimplemented { - log.Info("health check unimplemented in server, stopping") - return - } - log.Error(err, "health check failed") - errorCh <- errors.Wrap(err, "zone health check request failed") - } else if interval := resp.Interval.AsDuration(); interval > 0 { - if prevInterval != interval { - prevInterval = interval - log.Info("Global CP requested new healthcheck interval", "interval", interval) - } - ticker.Reset(interval) - } - - select { - case <-ticker.C: - continue - case <-stop: - log.Info("stopping") + prevInterval := 5 * time.Minute + ticker := time.NewTicker(prevInterval) + defer ticker.Stop() + for { + log.Info("sending health check") + resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + if err != nil && !errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Unimplemented { + log.Info("health check unimplemented in server, stopping") return } + log.Error(err, "health check failed") + errorCh <- errors.Wrap(err, "zone health check request failed") + } else if interval := resp.Interval.AsDuration(); interval > 0 { + if prevInterval != interval { + prevInterval = interval + log.Info("Global CP requested new healthcheck interval", "interval", interval) + } + ticker.Reset(interval) } - }() + + select { + case <-ticker.C: + continue + case <-ctx.Done(): + log.Info("stopping") + return + } + } } func (c *client) handleProcessingErrors( stream grpc.ClientStream, log logr.Logger, - stop <-chan struct{}, processingErrorsCh chan error, errorCh chan error, ) { - select { - case <-stop: - log.Info("Envoy Admin rpc stream stopped") - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - case err := <-processingErrorsCh: - if status.Code(err) == codes.Unimplemented { - log.Error(err, "Envoy Admin rpc stream failed, because Global CP does not implement this rpc. Upgrade Global CP.") - // backwards compatibility. Do not rethrow error, so KDS multiplex can still operate. - return - } - log.Error(err, "Envoy Admin rpc stream failed prematurely, will restart in background") - if err := stream.CloseSend(); err != nil { - log.Error(err, "CloseSend returned an error") - } - errorCh <- err + err := <-processingErrorsCh + if status.Code(err) == codes.Unimplemented { + log.Error(err, "rpc stream failed, because global CP does not implement this rpc. Upgrade remote CP.") + // backwards compatibility. Do not rethrow error, so KDS multiplex can still operate. return } + if errors.Is(err, context.Canceled) { + log.Info("rpc stream shutting down") + // Let's not propagate this error further as we've already cancelled the context + err = nil + } else { + log.Error(err, "rpc stream failed prematurely, will restart in background") + } + if err := stream.CloseSend(); err != nil { + log.Error(err, "CloseSend returned an error") + } + if err != nil { + errorCh <- err + } } func (c *client) NeedLeaderElection() bool {