Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): better lifecycle when context is getting cancelled #8268

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions app/kuma-dp/cmd/run.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -144,8 +145,10 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
}()
}

shouldQuit := make(chan struct{})
// gracefulCtx indicate that the process received a signal to shutdown
gracefulCtx, ctx := opts.SetupSignalHandler()
// componentCtx indicates that components should shutdown (you can use cancel to trigger the shutdown of all components)
componentCtx, cancelComponents := context.WithCancel(gracefulCtx)
accessLogSocketPath := core_xds.AccessLogSocketName(cfg.DataplaneRuntime.SocketDir, cfg.Dataplane.Name, cfg.Dataplane.Mesh)
components := []component.Component{
tokenComp,
Expand All @@ -160,17 +163,15 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
Dataplane: rest.From.Resource(proxyResource),
Stdout: cmd.OutOrStdout(),
Stderr: cmd.OutOrStderr(),
Quit: shouldQuit,
OnFinish: cancelComponents,
}

if cfg.DNS.Enabled &&
cfg.Dataplane.ProxyType != string(mesh_proto.IngressProxyType) &&
cfg.Dataplane.ProxyType != string(mesh_proto.EgressProxyType) {
if cfg.DNS.Enabled && !cfg.Dataplane.IsZoneProxy() {
dnsOpts := &dnsserver.Opts{
Config: *cfg,
Stdout: cmd.OutOrStdout(),
Stderr: cmd.OutOrStderr(),
Quit: shouldQuit,
Config: *cfg,
Stdout: cmd.OutOrStdout(),
Stderr: cmd.OutOrStderr(),
OnFinish: cancelComponents,
}

dnsServer, err := dnsserver.New(dnsOpts)
Expand Down Expand Up @@ -225,43 +226,44 @@ func newRunCmd(opts kuma_cmd.RunCmdOpts, rootCtx *RootContext) *cobra.Command {
}
opts.AdminPort = bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue()

dataplane, err := envoy.New(opts)
envoyComponent, err := envoy.New(opts)
if err != nil {
return err
}
components = append(components, envoyComponent)

components = append(components, dataplane)
metricsServer := metrics.New(
metricsSocketPath,
getApplicationsToScrape(kumaSidecarConfiguration, bootstrap.GetAdmin().GetAddress().GetSocketAddress().GetPortValue()),
kumaSidecarConfiguration.Networking.IsUsingTransparentProxy,
)
components = append(components, metricsServer)

if err := rootCtx.ComponentManager.Add(components...); err != nil {
return err
}

stopComponents := make(chan struct{})
go func() {
<-gracefulCtx.Done()
runLog.Info("Kuma DP caught an exit signal. Draining Envoy connections")
if err := dataplane.DrainConnections(); err != nil {
runLog.Error(err, "could not drain connections")
} else {
runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime)
select {
case <-time.After(cfg.Dataplane.DrainTime.Duration):
case <-ctx.Done():
select {
case <-gracefulCtx.Done():
runLog.Info("Kuma DP caught an exit signal. Draining Envoy connections")
if err := envoyComponent.DrainConnections(); err != nil {
runLog.Error(err, "could not drain connections")
} else {
runLog.Info("waiting for connections to be drained", "waitTime", cfg.Dataplane.DrainTime)
select {
case <-time.After(cfg.Dataplane.DrainTime.Duration):
case <-ctx.Done():
}
}
case <-componentCtx.Done():
}
runLog.Info("stopping all Kuma DP components")
if shouldQuit != nil {
close(shouldQuit)
}
close(stopComponents)
}()

runLog.Info("starting Kuma DP", "version", kuma_version.Build.Version)
if err := rootCtx.ComponentManager.Start(shouldQuit); err != nil {
if err := rootCtx.ComponentManager.Start(stopComponents); err != nil {
runLog.Error(err, "error while running Kuma DP")
return err
}
Expand Down
27 changes: 27 additions & 0 deletions app/kuma-dp/pkg/dataplane/command/base_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package command

import (
"context"
"io"
"os/exec"
"syscall"
"time"
)

func baseBuildCommand(
ctx context.Context,
stdout io.Writer,
stderr io.Writer,
name string,
args ...string,
) *exec.Cmd {
command := exec.CommandContext(ctx, name, args...)
command.Stdout = stdout
command.Stderr = stderr
command.Cancel = func() error {
return command.Process.Signal(syscall.SIGTERM)
}
command.WaitDelay = time.Second * 5
lahabana marked this conversation as resolved.
Show resolved Hide resolved

return command
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build !linux && !windows
//go:build darwin

package command

Expand All @@ -16,14 +16,11 @@ func BuildCommand(
name string,
args ...string,
) *exec.Cmd {
command := exec.CommandContext(ctx, name, args...)
command.Stdout = stdout
command.Stderr = stderr
command := baseBuildCommand(ctx, stdout, stderr, name, args...)
command.SysProcAttr = &syscall.SysProcAttr{
// Set those attributes so the new process won't receive the signals from a parent automatically.
Setpgid: true,
Pgid: 0,
}

return command
}
5 changes: 1 addition & 4 deletions app/kuma-dp/pkg/dataplane/command/build_command_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ func BuildCommand(
name string,
args ...string,
) *exec.Cmd {
command := exec.CommandContext(ctx, name, args...)
command.Stdout = stdout
command.Stderr = stderr
command := baseBuildCommand(ctx, stdout, stderr, name, args...)
command.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGKILL,
// Set those attributes so the new process won't receive the signals from a parent automatically.
Setpgid: true,
Pgid: 0,
}

return command
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ func BuildCommand(
name string,
args ...string,
) *exec.Cmd {
command := exec.CommandContext(ctx, name, args...)
command.Stdout = stdout
command.Stderr = stderr
command := baseBuildCommand(ctx, stdout, stderr, name, args)
// todo(jakubdyszkiewicz): do not propagate SIGTERM

return command
Expand Down
51 changes: 22 additions & 29 deletions app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type DNSServer struct {
var _ component.GracefulComponent = &DNSServer{}

type Opts struct {
Config kuma_dp.Config
Stdout io.Writer
Stderr io.Writer
Quit chan struct{}
Config kuma_dp.Config
Stdout io.Writer
Stderr io.Writer
OnFinish context.CancelFunc
}

func lookupDNSServerPath(configuredPath string) (string, error) {
Expand All @@ -47,8 +47,10 @@ func lookupDNSServerPath(configuredPath string) (string, error) {
func New(opts *Opts) (*DNSServer, error) {
dnsServerPath, err := lookupDNSServerPath(opts.Config.DNS.CoreDNSBinaryPath)
if err != nil {
runLog.Error(err, "could not find the DNS Server executable in your path")
return nil, err
return nil, errors.Wrapf(err, "could not find coreDNS executable")
}
if opts.OnFinish == nil {
opts.OnFinish = func() {}
}

return &DNSServer{opts: opts, path: dnsServerPath}, nil
Expand All @@ -59,7 +61,7 @@ func (s *DNSServer) GetVersion() (string, error) {
command := exec.Command(path, "--version")
output, err := command.Output()
if err != nil {
return "", err
return "", errors.Wrapf(err, "failed to execute coreDNS at path %s", path)
}

match := regexp.MustCompile(`CoreDNS-(.*)`).FindSubmatch(output)
Expand All @@ -76,6 +78,12 @@ func (s *DNSServer) NeedLeaderElection() bool {

func (s *DNSServer) Start(stop <-chan struct{}) error {
s.wg.Add(1)
// Component should only be considered done after CoreDNS exists.
// Otherwise, we may not propagate SIGTERM on time.
defer func() {
s.wg.Done()
s.opts.OnFinish()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -129,33 +137,18 @@ func (s *DNSServer) Start(stop <-chan struct{}) error {
return err
}

done := make(chan error, 1)

go func() {
done <- command.Wait()
// Component should only be considered done after CoreDNS exists.
// Otherwise, we may not propagate SIGTERM on time.
s.wg.Done()
}()

select {
case <-stop:
<-stop
runLog.Info("stopping DNS Server")
cancel()
return nil
case err := <-done:
if err != nil {
runLog.Error(err, "DNS Server terminated with an error")
} else {
runLog.Info("DNS Server terminated successfully")
}

if s.opts.Quit != nil {
close(s.opts.Quit)
}

}()
err = command.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
runLog.Error(err, "DNS Server terminated with an error")
return err
}
runLog.Info("DNS Server terminated successfully")
return nil
}

func (s *DNSServer) WaitForDone() {
Expand Down
41 changes: 18 additions & 23 deletions app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ type Opts struct {
Dataplane rest.Resource
Stdout io.Writer
Stderr io.Writer
Quit chan struct{}
OnFinish func()
}

func New(opts Opts) (*Envoy, error) {
if _, err := lookupEnvoyPath(opts.Config.DataplaneRuntime.BinaryPath); err != nil {
runLog.Error(err, "could not find the envoy executable in your path")
return nil, err
return nil, errors.Wrap(err, "could not find envoy executable")
}
if opts.OnFinish == nil {
opts.OnFinish = func() {}
}
return &Envoy{opts: opts}, nil
}
Expand Down Expand Up @@ -87,6 +89,12 @@ func lookupEnvoyPath(configuredPath string) (string, error) {

func (e *Envoy) Start(stop <-chan struct{}) error {
e.wg.Add(1)
// Component should only be considered done after Envoy exists.
// Otherwise, we may not propagate SIGTERM on time.
defer func() {
e.wg.Done()
e.opts.OnFinish()
}()

configFile, err := GenerateBootstrapFile(e.opts.Config.DataplaneRuntime, e.opts.BootstrapConfig)
if err != nil {
Expand Down Expand Up @@ -144,31 +152,18 @@ func (e *Envoy) Start(stop <-chan struct{}) error {
runLog.Error(err, "envoy executable failed", "path", resolvedPath, "arguments", args)
return err
}
done := make(chan error, 1)
go func() {
done <- command.Wait()
// Component should only be considered done after Envoy exists.
// Otherwise, we may not propagate SIGTERM on time.
e.wg.Done()
}()

select {
case <-stop:
<-stop
runLog.Info("stopping Envoy")
cancel()
return nil
case err := <-done:
if err != nil {
runLog.Error(err, "Envoy terminated with an error")
} else {
runLog.Info("Envoy terminated successfully")
}
if e.opts.Quit != nil {
close(e.opts.Quit)
}

}()
err = command.Wait()
if err != nil && !errors.Is(err, context.Canceled) {
runLog.Error(err, "Envoy terminated with an error")
return err
}
runLog.Info("Envoy terminated successfully")
return nil
}

func (e *Envoy) WaitForDone() {
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/app/kuma-dp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ func (d *Dataplane) PostProcess() error {
return nil
}

func (d *Dataplane) IsZoneProxy() bool {
return d.ProxyType == string(mesh_proto.IngressProxyType) ||
d.ProxyType == string(mesh_proto.EgressProxyType)
}

func validateMeshOrName[V ~string](typ string, value V) error {
if value == "" {
return errors.Errorf("%s must be non-empty", typ)
Expand Down Expand Up @@ -345,7 +350,7 @@ type DNS struct {
CoreDNSConfigTemplatePath string `json:"coreDnsConfigTemplatePath,omitempty" envconfig:"kuma_dns_core_dns_config_template_path"`
// Dir to store auto-generated DNS Server config in.
ConfigDir string `json:"configDir,omitempty" envconfig:"kuma_dns_config_dir"`
// Port where Prometheus stats will be exposed for the DNS Server
// PrometheusPort where Prometheus stats will be exposed for the DNS Server
PrometheusPort uint32 `json:"prometheusPort,omitempty" envconfig:"kuma_dns_prometheus_port"`
}

Expand Down
Loading