Skip to content

Commit

Permalink
Add shutdown delay and e2e test (grafana#3395)
Browse files Browse the repository at this point in the history
* Add shutdown delay and e2e test

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* determinism + comment

Signed-off-by: Joe Elliott <[email protected]>

* .Stop in a different goroutine then .Start creates a race

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored and Koenraad Verheyden committed Feb 26, 2024
1 parent 2ee50b3 commit b9b44af
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala)
* [ENHANCEMENT] Add HTML pages /status/overrides and /status/overrides/{tenant} [#3244](https://github.com/grafana/tempo/pull/3244) [#3332](https://github.com/grafana/tempo/pull/3332) (@kvrhdn)
* [ENHANCEMENT] Precalculate and reuse the vParquet3 schema before opening blocks [#3367](https://github.com/grafana/tempo/pull/3367) (@stoewer)
* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#3395](https://github.com/grafana/tempo/pull/3395) (@joe-elliott)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)
* [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)
Expand Down
31 changes: 27 additions & 4 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"sort"
"time"

"github.com/go-kit/log/level"
"github.com/gorilla/mux"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/jedib0t/go-pretty/v6/table"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v3"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/grafana/tempo/pkg/usagestats"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
util_log "github.com/grafana/tempo/pkg/util/log"
)

const (
Expand Down Expand Up @@ -189,17 +192,23 @@ func (t *App) Run() error {
return fmt.Errorf("failed to start service manager: %w", err)
}

// Used to delay shutdown but return "not ready" during this delay.
shutdownRequested := atomic.NewBool(false)
// before starting servers, register /ready handler and gRPC health check service.
if t.cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm))
t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm, shutdownRequested))
}

t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET")

t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm))
t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm, shutdownRequested))
t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET")
t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET")
grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm))
grpc_health_v1.RegisterHealthServer(t.Server.GRPC(),
grpcutil.NewHealthCheckFrom(
grpcutil.WithShutdownRequested(shutdownRequested),
grpcutil.WithManager(sm),
))

// Let's listen for events from this manager, and log them.
healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") }
Expand Down Expand Up @@ -231,6 +240,14 @@ func (t *App) Run() error {
handler := signals.NewHandler(t.Server.Log())
go func() {
handler.Loop()

shutdownRequested.Store(true)
t.Server.SetKeepAlivesEnabled(false)

if t.cfg.ShutdownDelay > 0 {
time.Sleep(t.cfg.ShutdownDelay)
}

sm.StopAsync()
}()

Expand Down Expand Up @@ -301,8 +318,14 @@ func (t *App) writeStatusConfig(w io.Writer, r *http.Request) error {
return nil
}

func (t *App) readyHandler(sm *services.Manager) http.HandlerFunc {
func (t *App) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if shutdownRequested.Load() {
level.Debug(util_log.Logger).Log("msg", "application is stopping")
http.Error(w, "Application is stopping", http.StatusServiceUnavailable)
return
}

if !sm.IsHealthy() {
msg := bytes.Buffer{}
msg.WriteString("Some services are not Running:\n")
Expand Down
18 changes: 10 additions & 8 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (

// Config is the root config for App.
type Config struct {
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`
EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"`
AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"`
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
ShutdownDelay time.Duration `yaml:"shutdown_delay,omitempty"`
StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`
EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"`
AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"`

Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Expand Down Expand Up @@ -76,6 +77,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
f.BoolVar(&c.UseOTelTracer, "use-otel-tracer", false, "Set to true to replace the OpenTracing tracer with the OpenTelemetry tracer")
f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics")
f.BoolVar(&c.AutocompleteFilteringEnabled, "autocomplete-filtering.enabled", true, "Set to false to disable autocomplete filtering")
f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Tempo will report not-ready status via /ready endpoint.")

// Server settings
flagext.DefaultValues(&c.Server)
Expand Down
5 changes: 5 additions & 0 deletions cmd/tempo/app/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TempoServer interface {
GRPC() *grpc.Server
Log() log.Logger
EnableHTTP2()
SetKeepAlivesEnabled(enabled bool)

StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error)
}
Expand Down Expand Up @@ -62,6 +63,10 @@ func (s *tempoServer) EnableHTTP2() {
})
}

func (s *tempoServer) SetKeepAlivesEnabled(enabled bool) {
s.externalServer.HTTPServer.SetKeepAlivesEnabled(enabled)
}

func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) {
var err error

Expand Down
47 changes: 47 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,53 @@ func TestMicroservicesWithKVStores(t *testing.T) {
}
}

func TestShutdownDelay(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(configAllInOneS3)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml"))
tempo := util.NewTempoAllInOne("-shutdown-delay=5s")

// this line tests confirms that the readiness flag is up
require.NoError(t, s.StartAndWaitReady(tempo))

// if we're here the readiness flag is up. now call kill and check the readiness flag is down
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10; i++ {
res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready")
require.NoError(t, err)
res.Body.Close()

if res.StatusCode == http.StatusServiceUnavailable {
// found it!
return
}
time.Sleep(time.Second)
}

require.Fail(t, "readiness flag never went down")
}()

// call stop and allow the code above to test for a unavailable readiness flag
_ = tempo.Stop()

wg.Wait()
}

func TestScalableSingleBinary(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
Expand Down

0 comments on commit b9b44af

Please sign in to comment.