From 4443e983fe8071b3bce64f246b7bfcb2f1248bab Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 14 Feb 2024 13:50:24 -0500 Subject: [PATCH 1/4] Add shutdown delay and e2e test Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + cmd/tempo/app/app.go | 31 +++++++++++++++++++++++++++---- cmd/tempo/app/config.go | 18 ++++++++++-------- cmd/tempo/app/server_service.go | 5 +++++ integration/e2e/e2e_test.go | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a683cdc1ed..a43c7b14680 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala) * [ENHANCEMENT] Add HTML pages /status/overrides and /status/overrides/{tenant} [#3244](https://github.com/grafana/tempo/pull/3244) [#3332](https://github.com/grafana/tempo/pull/3332) (@kvrhdn) * [ENHANCEMENT] Precalculate and reuse the vParquet3 schema before opening blocks [#3367](https://github.com/grafana/tempo/pull/3367) (@stoewer) +* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#???](https://github.com/grafana/tempo/pull/???) (@joe-elliott) * [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno) * [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno) * [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 3a5f3a47305..9a6f5156a5b 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "sort" + "time" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -23,6 +24,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v3" @@ -42,6 +44,7 @@ import ( "github.com/grafana/tempo/pkg/usagestats" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/log" + util_log "github.com/grafana/tempo/pkg/util/log" ) const ( @@ -189,17 +192,23 @@ func (t *App) Run() error { return fmt.Errorf("failed to start service manager: %w", err) } + // Used to delay shutdown but return "not ready" during this delay. + shutdownRequested := atomic.NewBool(false) // before starting servers, register /ready handler and gRPC health check service. if t.cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm)) + t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm, shutdownRequested)) } t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET") - t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm)) + t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm, shutdownRequested)) t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET") t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET") - grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm)) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), + grpcutil.NewHealthCheckFrom( + grpcutil.WithShutdownRequested(shutdownRequested), + grpcutil.WithManager(sm), + )) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") } @@ -231,6 +240,14 @@ func (t *App) Run() error { handler := signals.NewHandler(t.Server.Log()) go func() { handler.Loop() + + shutdownRequested.Store(true) + t.Server.SetKeepAlivesEnabled(false) + + if t.cfg.ShutdownDelay > 0 { + time.Sleep(t.cfg.ShutdownDelay) + } + sm.StopAsync() }() @@ -301,8 +318,14 @@ func (t *App) writeStatusConfig(w io.Writer, r *http.Request) error { return nil } -func (t *App) readyHandler(sm *services.Manager) http.HandlerFunc { +func (t *App) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if shutdownRequested.Load() { + level.Debug(util_log.Logger).Log("msg", "application is stopping") + http.Error(w, "Application is stopping", http.StatusServiceUnavailable) + return + } + if !sm.IsHealthy() { msg := bytes.Buffer{} msg.WriteString("Some services are not Running:\n") diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index 9a8490800aa..bfcad225b47 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -31,14 +31,15 @@ import ( // Config is the root config for App. type Config struct { - Target string `yaml:"target,omitempty"` - AuthEnabled bool `yaml:"auth_enabled,omitempty"` - MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"` - StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"` - HTTPAPIPrefix string `yaml:"http_api_prefix"` - UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"` - EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"` - AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"` + Target string `yaml:"target,omitempty"` + AuthEnabled bool `yaml:"auth_enabled,omitempty"` + MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"` + ShutdownDelay time.Duration `yaml:"shutdown_delay,omitempty"` + StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"` + HTTPAPIPrefix string `yaml:"http_api_prefix"` + UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"` + EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"` + AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"` Server server.Config `yaml:"server,omitempty"` InternalServer internalserver.Config `yaml:"internal_server,omitempty"` @@ -76,6 +77,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.BoolVar(&c.UseOTelTracer, "use-otel-tracer", false, "Set to true to replace the OpenTracing tracer with the OpenTelemetry tracer") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics") f.BoolVar(&c.AutocompleteFilteringEnabled, "autocomplete-filtering.enabled", true, "Set to false to disable autocomplete filtering") + f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Tempo will report not-ready status via /ready endpoint.") // Server settings flagext.DefaultValues(&c.Server) diff --git a/cmd/tempo/app/server_service.go b/cmd/tempo/app/server_service.go index 93dd2a7acdd..dd70c30d395 100644 --- a/cmd/tempo/app/server_service.go +++ b/cmd/tempo/app/server_service.go @@ -25,6 +25,7 @@ type TempoServer interface { GRPC() *grpc.Server Log() log.Logger EnableHTTP2() + SetKeepAlivesEnabled(enabled bool) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) } @@ -62,6 +63,10 @@ func (s *tempoServer) EnableHTTP2() { }) } +func (s *tempoServer) SetKeepAlivesEnabled(enabled bool) { + s.externalServer.HTTPServer.SetKeepAlivesEnabled(enabled) +} + func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) { var err error diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index be20269eca1..68ad5e72859 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -312,6 +312,39 @@ func TestMicroservicesWithKVStores(t *testing.T) { } } +func TestShutdownDelay(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(configAllInOneS3) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + _, err = backend.New(s, cfg) + require.NoError(t, err) + + require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml")) + tempo := util.NewTempoAllInOne("-shutdown-delay=5s") + require.NoError(t, s.StartAndWaitReady(tempo)) + tempo.SetReadinessProbe(nil) + + // if we're here the readiness flag is up. now call kill and check the readiness flag is down + go func() { + _ = tempo.Stop() + }() + + time.Sleep(500 * time.Millisecond) + + // confirm the readiness flag is down + res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") + require.NoError(t, err) + require.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + defer res.Body.Close() +} + func TestScalableSingleBinary(t *testing.T) { s, err := e2e.NewScenario("tempo_e2e") require.NoError(t, err) From 812b2842ff51be906d08ecff3984e83579cdf580 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 14 Feb 2024 13:53:37 -0500 Subject: [PATCH 2/4] changelog Signed-off-by: Joe Elliott --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a43c7b14680..dd2616b0f47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,7 +52,7 @@ * [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala) * [ENHANCEMENT] Add HTML pages /status/overrides and /status/overrides/{tenant} [#3244](https://github.com/grafana/tempo/pull/3244) [#3332](https://github.com/grafana/tempo/pull/3332) (@kvrhdn) * [ENHANCEMENT] Precalculate and reuse the vParquet3 schema before opening blocks [#3367](https://github.com/grafana/tempo/pull/3367) (@stoewer) -* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#???](https://github.com/grafana/tempo/pull/???) (@joe-elliott) +* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#3395](https://github.com/grafana/tempo/pull/3395) (@joe-elliott) * [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno) * [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno) * [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott) From b227e2538fa085c43ea284022042a5b234276e67 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 14 Feb 2024 16:24:40 -0500 Subject: [PATCH 3/4] determinism + comment Signed-off-by: Joe Elliott --- integration/e2e/e2e_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index 68ad5e72859..feff803f5c5 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -328,15 +328,19 @@ func TestShutdownDelay(t *testing.T) { require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml")) tempo := util.NewTempoAllInOne("-shutdown-delay=5s") + + // this line tests confirms that the readiness flag is up require.NoError(t, s.StartAndWaitReady(tempo)) - tempo.SetReadinessProbe(nil) // if we're here the readiness flag is up. now call kill and check the readiness flag is down + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() _ = tempo.Stop() }() - time.Sleep(500 * time.Millisecond) + wg.Wait() // confirm the readiness flag is down res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") From a3e6147fe48f136118f1c97119b2315b24cf3a94 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 14 Feb 2024 17:34:20 -0500 Subject: [PATCH 4/4] .Stop in a different goroutine then .Start creates a race Signed-off-by: Joe Elliott --- integration/e2e/e2e_test.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index feff803f5c5..5fe1bff8349 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -337,16 +337,26 @@ func TestShutdownDelay(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _ = tempo.Stop() + + for i := 0; i < 10; i++ { + res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") + require.NoError(t, err) + res.Body.Close() + + if res.StatusCode == http.StatusServiceUnavailable { + // found it! + return + } + time.Sleep(time.Second) + } + + require.Fail(t, "readiness flag never went down") }() - wg.Wait() + // call stop and allow the code above to test for a unavailable readiness flag + _ = tempo.Stop() - // confirm the readiness flag is down - res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") - require.NoError(t, err) - require.Equal(t, http.StatusServiceUnavailable, res.StatusCode) - defer res.Body.Close() + wg.Wait() } func TestScalableSingleBinary(t *testing.T) {