From f52be01c8627cace3090a8081609e540eee9d7ba Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 25 Mar 2022 17:45:04 +0800 Subject: [PATCH] feat: add grpc health interface (#1195) * feat: add grpc health interface Signed-off-by: Gaius * feat: add dfdaemon upload server healthy interface Signed-off-by: Gaius --- client/daemon/rpcserver/rpcserver.go | 6 ++++++ client/daemon/upload/upload_manager.go | 9 +++++++++ manager/router/router.go | 2 +- manager/rpcserver/rpcserver.go | 4 ++++ pkg/rpc/cdnsystem/server/server.go | 5 +++++ scheduler/rpcserver/rpcserver.go | 5 +++++ 6 files changed, 30 insertions(+), 1 deletion(-) diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 0da2ad97913..dbdaa6497fb 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -26,6 +26,8 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/client/clientutil" @@ -66,8 +68,12 @@ func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storage peerTaskManager: peerTaskManager, storageManager: storageManager, } + svr.downloadServer = dfdaemonserver.New(svr, downloadOpts...) + healthpb.RegisterHealthServer(svr.downloadServer, health.NewServer()) + svr.peerServer = dfdaemonserver.New(svr, peerOpts...) + healthpb.RegisterHealthServer(svr.peerServer, health.NewServer()) return svr, nil } diff --git a/client/daemon/upload/upload_manager.go b/client/daemon/upload/upload_manager.go index a3d035cf030..57b8d6f58aa 100644 --- a/client/daemon/upload/upload_manager.go +++ b/client/daemon/upload/upload_manager.go @@ -73,6 +73,10 @@ func WithLimiter(limiter *rate.Limiter) func(*uploadManager) { func (um *uploadManager) initRouter() { r := mux.NewRouter() + // Health Check + r.HandleFunc("/healthy", um.handleHealth).Methods("GET") + + // Peer download task r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET") um.Server.Handler = r } @@ -85,6 +89,11 @@ func (um *uploadManager) Stop() error { return um.Server.Shutdown(context.Background()) } +// handleHealth uses to check server health. +func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + // handleUpload uses to upload a task file when other peers download from it. func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) { var ( diff --git a/manager/router/router.go b/manager/router/router.go index c267b2f24a5..d31ed674e52 100644 --- a/manager/router/router.go +++ b/manager/router/router.go @@ -218,7 +218,7 @@ func Init(cfg *config.Config, logDir string, service service.Service, enforcer * pv1.GET(":id", h.GetV1Preheat) // Health Check - r.GET("/healthy/*action", h.GetHealth) + r.GET("/healthy", h.GetHealth) // Swagger apiSeagger := ginSwagger.URL("/swagger/doc.json") diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index c5d084c4545..d462d9ce5b2 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -30,6 +30,8 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" "gorm.io/gorm" @@ -76,7 +78,9 @@ func New(database *database.Database, cache *cache.Cache, searcher searcher.Sear grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares...)), }, opts...)...) + // Register servers on grpc server manager.RegisterManagerServer(grpcServer, server) + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) return grpcServer } diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go index a59dddf2a51..c2285c20c26 100644 --- a/pkg/rpc/cdnsystem/server/server.go +++ b/pkg/rpc/cdnsystem/server/server.go @@ -23,6 +23,8 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "d7y.io/dragonfly/v2/cdn/metrics" "d7y.io/dragonfly/v2/internal/dferrors" @@ -52,7 +54,10 @@ type proxy struct { func New(seederServer SeederServer, opts ...grpc.ServerOption) *grpc.Server { grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...) + + // Register servers on grpc server cdnsystem.RegisterSeederServer(grpcServer, &proxy{server: seederServer}) + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) return grpcServer } diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 047a2c8b79e..a437f86402b 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -20,6 +20,8 @@ import ( "context" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" empty "google.golang.org/protobuf/types/known/emptypb" "d7y.io/dragonfly/v2/pkg/rpc" @@ -42,7 +44,10 @@ type Server struct { func New(service *service.Service, opts ...grpc.ServerOption) *grpc.Server { svr := &Server{service: service} grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...) + + // Register servers on grpc server scheduler.RegisterSchedulerServer(grpcServer, svr) + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) return grpcServer }