Skip to content

Commit

Permalink
feat: add grpc health interface (#1195)
Browse files Browse the repository at this point in the history
* feat: add grpc health interface

Signed-off-by: Gaius <[email protected]>

* feat: add dfdaemon upload server healthy interface

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Mar 25, 2022
1 parent 7e5860e commit f52be01
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 1 deletion.
6 changes: 6 additions & 0 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"d7y.io/dragonfly/v2/client/clientutil"
Expand Down Expand Up @@ -66,8 +68,12 @@ func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storage
peerTaskManager: peerTaskManager,
storageManager: storageManager,
}

svr.downloadServer = dfdaemonserver.New(svr, downloadOpts...)
healthpb.RegisterHealthServer(svr.downloadServer, health.NewServer())

svr.peerServer = dfdaemonserver.New(svr, peerOpts...)
healthpb.RegisterHealthServer(svr.peerServer, health.NewServer())
return svr, nil
}

Expand Down
9 changes: 9 additions & 0 deletions client/daemon/upload/upload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {

func (um *uploadManager) initRouter() {
r := mux.NewRouter()
// Health Check
r.HandleFunc("/healthy", um.handleHealth).Methods("GET")

// Peer download task
r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET")
um.Server.Handler = r
}
Expand All @@ -85,6 +89,11 @@ func (um *uploadManager) Stop() error {
return um.Server.Shutdown(context.Background())
}

// handleHealth uses to check server health.
func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// handleUpload uses to upload a task file when other peers download from it.
func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) {
var (
Expand Down
2 changes: 1 addition & 1 deletion manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func Init(cfg *config.Config, logDir string, service service.Service, enforcer *
pv1.GET(":id", h.GetV1Preheat)

// Health Check
r.GET("/healthy/*action", h.GetHealth)
r.GET("/healthy", h.GetHealth)

// Swagger
apiSeagger := ginSwagger.URL("/swagger/doc.json")
Expand Down
4 changes: 4 additions & 0 deletions manager/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"gorm.io/gorm"

Expand Down Expand Up @@ -76,7 +78,9 @@ func New(database *database.Database, cache *cache.Cache, searcher searcher.Sear
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares...)),
}, opts...)...)

// Register servers on grpc server
manager.RegisterManagerServer(grpcServer, server)
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/rpc/cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

"d7y.io/dragonfly/v2/cdn/metrics"
"d7y.io/dragonfly/v2/internal/dferrors"
Expand Down Expand Up @@ -52,7 +54,10 @@ type proxy struct {

func New(seederServer SeederServer, opts ...grpc.ServerOption) *grpc.Server {
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)

// Register servers on grpc server
cdnsystem.RegisterSeederServer(grpcServer, &proxy{server: seederServer})
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}

Expand Down
5 changes: 5 additions & 0 deletions scheduler/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
empty "google.golang.org/protobuf/types/known/emptypb"

"d7y.io/dragonfly/v2/pkg/rpc"
Expand All @@ -42,7 +44,10 @@ type Server struct {
func New(service *service.Service, opts ...grpc.ServerOption) *grpc.Server {
svr := &Server{service: service}
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)

// Register servers on grpc server
scheduler.RegisterSchedulerServer(grpcServer, svr)
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}

Expand Down

0 comments on commit f52be01

Please sign in to comment.