Skip to content

Commit

Permalink
basic implement TSO server
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Feb 9, 2023
1 parent 0e08a5e commit dd860c4
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 59 deletions.
9 changes: 7 additions & 2 deletions pkg/basic_server/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"net/http"

"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -27,14 +28,18 @@ type Server interface {
Name() string
// Context returns the context of server.
Context() context.Context

// Run runs the server.
Run() error
// Close closes the server.
Close()

// GetClient returns builtin etcd client.
GetClient() *clientv3.Client
// GetHTTPClient returns builtin http client.
GetHTTPClient() *http.Client
// AddStartCallback adds a callback in the startServer phase.
AddStartCallback(callbacks ...func())
// GetMember returns the member information.
GetMember() *member.Member
// AddLeaderCallback adds a callback in the leader campaign phase.
AddLeaderCallback(callbacks ...func(context.Context))
}
91 changes: 91 additions & 0 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"net/http"

"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/server"
"google.golang.org/grpc"
)

var _ tsopb.TSOServer = (*Service)(nil)

// SetUpRestHandler is a hook to sets up the REST service.
var SetUpRestHandler = func(srv *Service) (http.Handler, server.APIServiceGroup) {
return dummyRestService{}, server.APIServiceGroup{}
}

type dummyRestService struct{}

func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotImplemented)
w.Write([]byte("not implemented"))
}

// Service is the gRPC service for TSO.
type Service struct {
ctx context.Context
server *Server
// settings
}

// NewService creates a new TSO service.
func NewService(svr *Server) registry.RegistrableService {
return &Service{
ctx: svr.Context(),
server: svr,
}
}

// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
tsopb.RegisterTSOServer(g, s)
}

// RegisterRESTHandler registers the service to REST server.
func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) {
handler, group := SetUpRestHandler(s)
server.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler)
}

// Tso returns a stream of timestamps
func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return nil
}

// SyncMaxTS will check whether MaxTS is the biggest one among all Local TSOs this PD is holding when skipCheck is set,
// and write it into all Local TSO Allocators then if it's indeed the biggest one.
func (s *Service) SyncMaxTS(_ context.Context, request *tsopb.SyncMaxTSRequest) (*tsopb.SyncMaxTSResponse, error) {
return nil, nil
}

// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Service) GetDCLocationInfo(ctx context.Context, request *tsopb.GetDCLocationInfoRequest) (*tsopb.GetDCLocationInfoResponse, error) {
return nil, nil
}

// SetExternalTimestamp sets a given external timestamp to perform stale read.
func (s *Service) SetExternalTimestamp(ctx context.Context, request *tsopb.SetExternalTimestampRequest) (*tsopb.SetExternalTimestampResponse, error) {
return nil, nil
}

// GetExternalTimestamp gets the saved external timstamp.
func (s *Service) GetExternalTimestamp(ctx context.Context, request *tsopb.GetExternalTimestampRequest) (*tsopb.GetExternalTimestampResponse, error) {
return nil, nil
}
87 changes: 30 additions & 57 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,10 @@ package server

import (
"context"
"flag"
"net/http"
"os"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -36,16 +28,32 @@ import (
var _ basicsvr.Server = (*Server)(nil)

// Server is the TSO server, and it implements basicsvr.Server.
// nolint
type Server struct {
ctx context.Context
ctx context.Context
name string
client *clientv3.Client
member *member.Member
// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
// leaderCallbacks will be called after the server becomes leader.
leaderCallbacks []func(context.Context)
}

// NewServer creates a new TSO server.
func NewServer(ctx context.Context, client *clientv3.Client) *Server {
return &Server{
ctx: ctx,
name: "TSO",
client: client,
}
}

// TODO: Implement the following methods defined in basicsvr.Server

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return ""
return s.name
}

// Context returns the context of server.
Expand All @@ -64,60 +72,25 @@ func (s *Server) Close() {

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return nil
return s.client
}

// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return nil
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])

if cfg.Version {
printVersionInfo()
exit(0)
}

defer logutil.LogPanic()

switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
exit(0)
default:
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
}

// TODO: Initialize logger

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
// AddStartCallback adds a callback in the startServer phase.
func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
// GetMember returns the member.
func (s *Server) GetMember() *member.Member {
return s.member
}

func exit(code int) {
log.Sync()
os.Exit(code)
// AddLeaderCallback adds the callback function when the server becomes leader.
func (s *Server) AddLeaderCallback(callbacks ...func(context.Context)) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}
65 changes: 65 additions & 0 deletions pkg/mode/tso.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"time"

"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
_ "github.com/tikv/pd/pkg/mcs/registry"
tsosvr "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/server/config"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
_ "google.golang.org/grpc"
)

// TSOStart starts the TSO server.
func TSOStart(ctx context.Context, cfg *config.Config) basicsvr.Server {
// start client
etcdTimeout := time.Second * 3
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
return nil
}
etcdCfg, err := cfg.GenEmbedEtcdConfig()
if err != nil {
return nil
}

endpoints := []string{etcdCfg.ACUrls[0].String()}
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints), zap.Reflect("cert", cfg.Security))

lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: etcdTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
})
if err != nil {
return nil
}
// start server
svr := tsosvr.NewServer(ctx, client)
// TODO: wait for #5933 to check-in
//gs := grpc.NewServer()
//registry.ServerServiceRegistry.RegisterService("TSO", tsosvr.NewService)
//registry.ServerServiceRegistry.InstallAllGRPCServices(svr, gs)
return svr
}

0 comments on commit dd860c4

Please sign in to comment.