Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fx service Start/Stop #4584

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package frontend

import (
"context"
"fmt"
"net"

Expand Down Expand Up @@ -599,26 +598,6 @@ func HandlerProvider(
return wfHandler
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
39 changes: 13 additions & 26 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"

"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -274,7 +273,6 @@ func NewConfig(

// Service represents the frontend service
type Service struct {
status int32
config *Config

healthServer *health.Server
Expand Down Expand Up @@ -306,7 +304,6 @@ func NewService(
faultInjectionDataStoreFactory *client.FaultInjectionDataStoreFactory,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
config: serviceConfig,
server: server,
healthServer: healthServer,
Expand All @@ -324,12 +321,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("frontend starting")
s.logger.Info("frontend starting")

healthpb.RegisterHealthServer(s.server, s.healthServer)
workflowservice.RegisterWorkflowServiceServer(s.server, s.handler)
Expand All @@ -347,21 +339,16 @@ func (s *Service) Start() {
s.operatorHandler.Start()
s.handler.Start()

listener := s.grpcListener
logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(listener); err != nil {
logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
go func() {
s.logger.Info("Starting to serve on frontend listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on frontend listener", tag.Error(err))
}
}()
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger

if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown:
// 1. Fail rpc health check, this will cause client side load balancer to stop forwarding requests to this node
// 2. wait for failure detection time
Expand All @@ -372,10 +359,10 @@ func (s *Service) Stop() {
requestDrainTime := util.Max(time.Second, s.config.ShutdownDrainDuration())
failureDetectionTime := util.Max(0, s.config.ShutdownFailHealthCheckDuration())

logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.logger.Info("ShutdownHandler: Updating gRPC health status to ShuttingDown")
s.healthServer.Shutdown()

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
time.Sleep(failureDetectionTime)

s.handler.Stop()
Expand All @@ -384,19 +371,19 @@ func (s *Service) Stop() {
s.versionChecker.Stop()
s.visibilityManager.Close()

logger.Info("ShutdownHandler: Draining traffic")
s.logger.Info("ShutdownHandler: Draining traffic")
t := time.AfterFunc(requestDrainTime, func() {
logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.server.Stop()
})
s.server.GracefulStop()
t.Stop()

if s.metricsHandler != nil {
s.metricsHandler.Stop(logger)
s.metricsHandler.Stop(s.logger)
}

logger.Info("frontend stopped")
s.logger.Info("frontend stopped")
}

func namespaceRPS(
Expand Down
25 changes: 2 additions & 23 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package history

import (
"context"
"net"

"go.uber.org/fx"
Expand Down Expand Up @@ -289,26 +288,6 @@ func ArchivalClientProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
39 changes: 13 additions & 26 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ package history
import (
"math/rand"
"net"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand All @@ -49,7 +47,6 @@ import (
// Service represents the history service
type (
Service struct {
status int32
handler *Handler
visibilityManager manager.VisibilityManager
config *configs.Config
Expand Down Expand Up @@ -77,7 +74,6 @@ func NewService(
healthServer *health.Server,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
server: grpc.NewServer(grpcServerOptions...),
handler: handler,
visibilityManager: visibilityMgr,
Expand All @@ -93,12 +89,7 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

logger := s.logger
logger.Info("history starting")
s.logger.Info("history starting")

s.metricsHandler.Counter(metrics.RestartCount).Record(1)
rand.Seed(time.Now().UnixNano())
Expand All @@ -109,20 +100,16 @@ func (s *Service) Start() {
healthpb.RegisterHealthServer(s.server, s.healthServer)
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)

listener := s.grpcListener
logger.Info("Starting to serve on history listener")
if err := s.server.Serve(listener); err != nil {
logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
go func() {
s.logger.Info("Starting to serve on history listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on history listener", tag.Error(err))
}
}()
}

// Stop stops the service
func (s *Service) Stop() {
logger := s.logger
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// initiate graceful shutdown :
// 1. remove self from the membership ring
// 2. wait for other members to discover we are going down
Expand All @@ -140,19 +127,19 @@ func (s *Service) Stop() {

remainingTime := s.config.ShutdownDrainDuration()

logger.Info("ShutdownHandler: Evicting self from membership ring")
s.logger.Info("ShutdownHandler: Evicting self from membership ring")
_ = s.membershipMonitor.EvictSelf()
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
s.logger.Info("ShutdownHandler: Waiting for others to discover I am unhealthy")
remainingTime = s.sleep(gossipPropagationDelay, remainingTime)

logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()
logger.Info("ShutdownHandler: Waiting for traffic to drain")
s.logger.Info("ShutdownHandler: Waiting for traffic to drain")
remainingTime = s.sleep(shardOwnershipTransferDelay, remainingTime)

logger.Info("ShutdownHandler: No longer taking rpc requests")
s.logger.Info("ShutdownHandler: No longer taking rpc requests")
_ = s.sleep(gracePeriod, remainingTime)

// TODO: Change this to GracefulStop when integration tests are refactored.
Expand All @@ -161,7 +148,7 @@ func (s *Service) Stop() {
s.handler.Stop()
s.visibilityManager.Close()

logger.Info("history stopped")
s.logger.Info("history stopped")
}

// sleep sleeps for the minimum of desired and available duration
Expand Down
27 changes: 2 additions & 25 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package matching

import (
"context"

"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -204,27 +202,6 @@ func HandlerProvider(
)
}

func ServiceLifetimeHooks(
lc fx.Lifecycle,
svcStoppedCh chan struct{},
svc *Service,
) {
lc.Append(
fx.Hook{
OnStart: func(context.Context) error {
go func(svc *Service, svcStoppedCh chan<- struct{}) {
// Start is blocked until Stop() is called.
svc.Start()
close(svcStoppedCh)
}(svc, svcStoppedCh)

return nil
},
OnStop: func(ctx context.Context) error {
svc.Stop()
return nil
},
},
)

func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}
22 changes: 6 additions & 16 deletions service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ package matching
import (
"math/rand"
"net"
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
Expand All @@ -46,7 +44,6 @@ import (

// Service represents the matching service
type Service struct {
status int32
handler *Handler
config *Config

Expand Down Expand Up @@ -75,7 +72,6 @@ func NewService(
visibilityManager manager.VisibilityManager,
) *Service {
return &Service{
status: common.DaemonStatusInitialized,
config: serviceConfig,
server: grpc.NewServer(grpcServerOptions...),
handler: handler,
Expand All @@ -92,10 +88,6 @@ func NewService(

// Start starts the service
func (s *Service) Start() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

s.logger.Info("matching starting")

// must start base service first
Expand All @@ -108,18 +100,16 @@ func (s *Service) Start() {
healthpb.RegisterHealthServer(s.server, s.healthServer)
s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)

s.logger.Info("Starting to serve on matching listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on matching listener", tag.Error(err))
}
go func() {
s.logger.Info("Starting to serve on matching listener")
if err := s.server.Serve(s.grpcListener); err != nil {
s.logger.Fatal("Failed to serve on matching listener", tag.Error(err))
}
}()
}

// Stop stops the service
func (s *Service) Stop() {
if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

// remove self from membership ring and wait for traffic to drain
s.logger.Info("ShutdownHandler: Evicting self from membership ring")
if err := s.membershipMonitor.EvictSelf(); err != nil {
Expand Down
Loading