Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: supports to add gRPC dial options (#2035) #2043

Merged
merged 1 commit into from
Dec 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package pd

import (
"context"
"reflect"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -129,7 +131,8 @@ type client struct {
ctx context.Context
cancel context.CancelFunc

security SecurityOption
security SecurityOption
gRPCDialOptions []grpc.DialOption
}

// SecurityOption records options about tls
Expand All @@ -139,13 +142,23 @@ type SecurityOption struct {
KeyPath string
}

// ClientOption configures client.
type ClientOption func(c *client)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *client) {
c.gRPCDialOptions = append(c.gRPCDialOptions, opts...)
}
}

// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security)
func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts...)
}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (Client, error) {
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs))
ctx1, cancel := context.WithCancel(ctx)
c := &client{
Expand All @@ -158,6 +171,9 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
security: security,
}
c.connMu.clientConns = make(map[string]*grpc.ClientConn)
for _, opt := range opts {
opt(c)
}

if err := c.initRetry(c.initClusterID); err != nil {
cancel()
Expand All @@ -182,6 +198,14 @@ func (c *client) updateURLs(members []*pdpb.Member) {
for _, m := range members {
urls = append(urls, m.GetClientUrls()...)
}

sort.Strings(urls)
// the url list is same.
if reflect.DeepEqual(c.urls, urls) {
return
}

log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls))
c.urls = urls
}

Expand Down Expand Up @@ -222,7 +246,7 @@ func (c *client) updateLeader() error {
ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout)
members, err := c.getMembers(ctx, u)
if err != nil {
log.Warn("cannot update leader", zap.String("address", u), zap.Error(err))
log.Warn("[pd] cannot update leader", zap.String("address", u), zap.Error(err))
}
cancel()
if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
Expand Down Expand Up @@ -283,7 +307,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return conn, nil
}

cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath)
cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath, c.gRPCDialOptions...)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
36 changes: 36 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,28 @@ func (s *testClientSuite) TestScatterRegion(c *C) {
c.Succeed()
}

func (s *testClientSuite) TestUpdateURLs(c *C) {
members := []*pdpb.Member{
{Name: "pd4", ClientUrls: []string{"tmp//pd4"}},
{Name: "pd1", ClientUrls: []string{"tmp//pd1"}},
{Name: "pd3", ClientUrls: []string{"tmp//pd3"}},
{Name: "pd2", ClientUrls: []string{"tmp//pd2"}},
}
getURLs := func(ms []*pdpb.Member) (urls []string) {
for _, m := range ms {
urls = append(urls, m.GetClientUrls()[0])
}
return
}
cli := &client{}
cli.updateURLs(members[1:])
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]}))
cli.updateURLs(members[1:])
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]}))
cli.updateURLs(members)
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}))
}

var _ = Suite(&testClientCtxSuite{})

type testClientCtxSuite struct{}
Expand All @@ -481,3 +503,17 @@ func (s *testClientCtxSuite) TestClientCtx(c *C) {
c.Assert(err, NotNil)
c.Assert(time.Since(start), Less, time.Second*4)
}

var _ = Suite(&testClientDialOptionSuite{})

type testClientDialOptionSuite struct{}

func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
defer cancel()
// nolint
_, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second)))
c.Assert(err, NotNil)
c.Assert(time.Since(start), Greater, 800*time.Millisecond)
}