Skip to content

Commit

Permalink
Refactor fx service Start/Stop (#4584)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jul 6, 2023
1 parent bd241c0 commit 92c8418
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 224 deletions.
25 changes: 2 additions & 23 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package frontend

import (
"context"
"fmt"
"net"

Expand Down Expand Up @@ -601,26 +600,6 @@ func HandlerProvider(
return wfHandler
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
40 changes: 14 additions & 26 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"

"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -274,7 +273,6 @@ func NewConfig(

// Service represents the frontend service
type Service struct {
status int32
config *Config

healthServer *health.Server
Expand Down Expand Up @@ -308,7 +306,6 @@ func NewService(
membershipMonitor membership.Monitor,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
config: serviceConfig,
server: server,
healthServer: healthServer,
Expand All @@ -327,12 +324,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("frontend starting")
s.logger.Info("frontend starting")

healthpb.RegisterHealthServer(s.server, s.healthServer)
workflowservice.RegisterWorkflowServiceServer(s.server, s.handler)
Expand All @@ -350,22 +342,18 @@ func (s *Service) Start() {
s.operatorHandler.Start()
s.handler.Start()

go s.membershipMonitor.Start()
go func() {
s.logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
}()

logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(s.grpcListener); err != nil {
logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
go s.membershipMonitor.Start()
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger

if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown:
// 1. Fail rpc health check, this will cause client side load balancer to stop forwarding requests to this node
// 2. wait for failure detection time
Expand All @@ -376,10 +364,10 @@ func (s *Service) Stop() {
requestDrainTime := util.Max(time.Second, s.config.ShutdownDrainDuration())
failureDetectionTime := util.Max(0, s.config.ShutdownFailHealthCheckDuration())

logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.healthServer.Shutdown()

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
time.Sleep(failureDetectionTime)

s.handler.Stop()
Expand All @@ -388,19 +376,19 @@ func (s *Service) Stop() {
s.versionChecker.Stop()
s.visibilityManager.Close()

logger.Info("ShutdownHandler: Draining traffic")
s.logger.Info("ShutdownHandler: Draining traffic")
t := time.AfterFunc(requestDrainTime, func() {
logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.server.Stop()
})
s.server.GracefulStop()
t.Stop()

if s.metricsHandler != nil {
s.metricsHandler.Stop(logger)
s.metricsHandler.Stop(s.logger)
}

logger.Info("frontend stopped")
s.logger.Info("frontend stopped")
}

func namespaceRPS(
Expand Down
25 changes: 2 additions & 23 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"context"
"net"

"go.uber.org/fx"
Expand Down Expand Up @@ -289,26 +288,6 @@ func ArchivalClientProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
48 changes: 17 additions & 31 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ package history
import (
"math/rand"
"net"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand All @@ -49,7 +47,6 @@ import (
// Service represents the history service
type (
Service struct {
status int32
handler *Handler
visibilityManager manager.VisibilityManager
config *configs.Config
Expand Down Expand Up @@ -77,7 +74,6 @@ func NewService(
healthServer *health.Server,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
server: grpc.NewServer(grpcServerOptions...),
handler: handler,
visibilityManager: visibilityMgr,
Expand All @@ -93,12 +89,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("history starting")
s.logger.Info("history starting")

s.metricsHandler.Counter(metrics.RestartCount).Record(1)
rand.Seed(time.Now().UnixNano())
Expand All @@ -109,35 +100,30 @@ func (s *Service) Start() {
healthpb.RegisterHealthServer(s.server, s.healthServer)
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)

// As soon as we join membership, other hosts will send requests for shards
// that we own. Ideally, then, we would start the GRPC server, and only then
// join membership. That's not possible with the GRPC interface, though, hence
// we start membership in a goroutine.
go func() {
s.logger.Info("Starting to serve on history listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
}()

// As soon as we join membership, other hosts will send requests for shards that we own,
// so we should try to start this after starting the gRPC server.
go func() {
if delay := s.config.StartupMembershipJoinDelay(); delay > 0 {
// In some situations, like rolling upgrades of the history service,
// pausing before joining membership can help separate the shard movement
// caused by another history instance terminating with this instance starting.
logger.Info("history start: delaying before membership start",
s.logger.Info("history start: delaying before membership start",
tag.NewDurationTag("startupMembershipJoinDelay", delay))
time.Sleep(delay)
}
s.membershipMonitor.Start()
}()

logger.Info("Starting to serve on history listener")
if err := s.server.Serve(s.grpcListener); err != nil {
logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown :
// 1. remove self from the membership ring
// 2. wait for other members to discover we are going down
Expand All @@ -155,19 +141,19 @@ func (s *Service) Stop() {

remainingTime := s.config.ShutdownDrainDuration()

logger.Info("ShutdownHandler: Evicting self from membership ring")
s.logger.Info("ShutdownHandler: Evicting self from membership ring")
_ = s.membershipMonitor.EvictSelf()
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
remainingTime = s.sleep(gossipPropagationDelay, remainingTime)

logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
logger.Info("ShutdownHandler: Waiting for traffic to drain")
s.logger.Info("ShutdownHandler: Waiting for traffic to drain")
remainingTime = s.sleep(shardOwnershipTransferDelay, remainingTime)

logger.Info("ShutdownHandler: No longer taking rpc requests")
s.logger.Info("ShutdownHandler: No longer taking rpc requests")
_ = s.sleep(gracePeriod, remainingTime)

// TODO: Change this to GracefulStop when integration tests are refactored.
Expand All @@ -176,7 +162,7 @@ func (s *Service) Stop() {
s.handler.Stop()
s.visibilityManager.Close()

logger.Info("history stopped")
s.logger.Info("history stopped")
}

// sleep sleeps for the minimum of desired and available duration
Expand Down
27 changes: 2 additions & 25 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package matching

import (
"context"

"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -204,27 +202,6 @@ func HandlerProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)

func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
Loading

0 comments on commit 92c8418

Please sign in to comment.