Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fx service Start/Stop #4584

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package frontend

import (
"context"
"fmt"
"net"

Expand Down Expand Up @@ -601,26 +600,6 @@ func HandlerProvider(
return wfHandler
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
40 changes: 14 additions & 26 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"

"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -274,7 +273,6 @@ func NewConfig(

// Service represents the frontend service
type Service struct {
status int32
config *Config

healthServer *health.Server
Expand Down Expand Up @@ -308,7 +306,6 @@ func NewService(
membershipMonitor membership.Monitor,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
config: serviceConfig,
server: server,
healthServer: healthServer,
Expand All @@ -327,12 +324,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("frontend starting")
s.logger.Info("frontend starting")

healthpb.RegisterHealthServer(s.server, s.healthServer)
workflowservice.RegisterWorkflowServiceServer(s.server, s.handler)
Expand All @@ -350,22 +342,18 @@ func (s *Service) Start() {
s.operatorHandler.Start()
s.handler.Start()

go s.membershipMonitor.Start()
go func() {
s.logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
}()

logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(s.grpcListener); err != nil {
logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
go s.membershipMonitor.Start()
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger

if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown:
// 1. Fail rpc health check, this will cause client side load balancer to stop forwarding requests to this node
// 2. wait for failure detection time
Expand All @@ -376,10 +364,10 @@ func (s *Service) Stop() {
requestDrainTime := util.Max(time.Second, s.config.ShutdownDrainDuration())
failureDetectionTime := util.Max(0, s.config.ShutdownFailHealthCheckDuration())

logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.healthServer.Shutdown()

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
time.Sleep(failureDetectionTime)

s.handler.Stop()
Expand All @@ -388,19 +376,19 @@ func (s *Service) Stop() {
s.versionChecker.Stop()
s.visibilityManager.Close()

logger.Info("ShutdownHandler: Draining traffic")
s.logger.Info("ShutdownHandler: Draining traffic")
t := time.AfterFunc(requestDrainTime, func() {
logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.server.Stop()
})
s.server.GracefulStop()
t.Stop()

if s.metricsHandler != nil {
s.metricsHandler.Stop(logger)
s.metricsHandler.Stop(s.logger)
}

logger.Info("frontend stopped")
s.logger.Info("frontend stopped")
}

func namespaceRPS(
Expand Down
25 changes: 2 additions & 23 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"context"
"net"

"go.uber.org/fx"
Expand Down Expand Up @@ -289,26 +288,6 @@ func ArchivalClientProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
48 changes: 17 additions & 31 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ package history
import (
"math/rand"
"net"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand All @@ -49,7 +47,6 @@ import (
// Service represents the history service
type (
Service struct {
status int32
handler *Handler
visibilityManager manager.VisibilityManager
config *configs.Config
Expand Down Expand Up @@ -77,7 +74,6 @@ func NewService(
healthServer *health.Server,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
server: grpc.NewServer(grpcServerOptions...),
handler: handler,
visibilityManager: visibilityMgr,
Expand All @@ -93,12 +89,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("history starting")
s.logger.Info("history starting")

s.metricsHandler.Counter(metrics.RestartCount).Record(1)
rand.Seed(time.Now().UnixNano())
Expand All @@ -109,35 +100,30 @@ func (s *Service) Start() {
healthpb.RegisterHealthServer(s.server, s.healthServer)
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)

// As soon as we join membership, other hosts will send requests for shards
// that we own. Ideally, then, we would start the GRPC server, and only then
// join membership. That's not possible with the GRPC interface, though, hence
// we start membership in a goroutine.
go func() {
s.logger.Info("Starting to serve on history listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
}()

// As soon as we join membership, other hosts will send requests for shards that we own,
// so we should try to start this after starting the gRPC server.
go func() {
if delay := s.config.StartupMembershipJoinDelay(); delay > 0 {
// In some situations, like rolling upgrades of the history service,
// pausing before joining membership can help separate the shard movement
// caused by another history instance terminating with this instance starting.
logger.Info("history start: delaying before membership start",
s.logger.Info("history start: delaying before membership start",
tag.NewDurationTag("startupMembershipJoinDelay", delay))
time.Sleep(delay)
}
s.membershipMonitor.Start()
}()

logger.Info("Starting to serve on history listener")
if err := s.server.Serve(s.grpcListener); err != nil {
logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown :
// 1. remove self from the membership ring
// 2. wait for other members to discover we are going down
Expand All @@ -155,19 +141,19 @@ func (s *Service) Stop() {

remainingTime := s.config.ShutdownDrainDuration()

logger.Info("ShutdownHandler: Evicting self from membership ring")
s.logger.Info("ShutdownHandler: Evicting self from membership ring")
_ = s.membershipMonitor.EvictSelf()
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
remainingTime = s.sleep(gossipPropagationDelay, remainingTime)

logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
logger.Info("ShutdownHandler: Waiting for traffic to drain")
s.logger.Info("ShutdownHandler: Waiting for traffic to drain")
remainingTime = s.sleep(shardOwnershipTransferDelay, remainingTime)

logger.Info("ShutdownHandler: No longer taking rpc requests")
s.logger.Info("ShutdownHandler: No longer taking rpc requests")
_ = s.sleep(gracePeriod, remainingTime)

// TODO: Change this to GracefulStop when integration tests are refactored.
Expand All @@ -176,7 +162,7 @@ func (s *Service) Stop() {
s.handler.Stop()
s.visibilityManager.Close()

logger.Info("history stopped")
s.logger.Info("history stopped")
}

// sleep sleeps for the minimum of desired and available duration
Expand Down
27 changes: 2 additions & 25 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package matching

import (
"context"

"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -204,27 +202,6 @@ func HandlerProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)

func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
Loading