Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TSO Client and provide general client side service discovery framework and general gPRC stream handling framework. #6037

Merged
merged 15 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 205 additions & 133 deletions client/base_client.go

Large diffs are not rendered by default.

1,045 changes: 133 additions & 912 deletions client/client.go

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestUpdateURLs(t *testing.T) {
}
return
}
cli := &baseClient{option: newOption()}
cli := &pdBaseClient{option: newOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
Expand Down Expand Up @@ -89,13 +90,12 @@ func TestGRPCDialOption(t *testing.T) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &baseClient{
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
security: SecurityOption{},
option: newOption(),
cli := &pdBaseClient{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
tlsCfg: &tlsutil.TLSConfig{},
option: newOption(),
}
cli.urls.Store([]string{testClientURL})
cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()}
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4 h1:A3F8guzyZoU1vShtTnGwqW+ZK3RxGhfAf5EZJR8+mNQ=
github.com/pingcap/kvproto v0.0.0-20230220070831-5cc42e4327e4/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
38 changes: 36 additions & 2 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@ import (
"context"
"crypto/tls"
"net/url"
"sync"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

// ForwardMetadataKey is used to record the forwarded host of PD.
const ForwardMetadataKey = "pd-forwarded-host"
const (
dialTimeout = 3 * time.Second
// ForwardMetadataKey is used to record the forwarded host of PD.
ForwardMetadataKey = "pd-forwarded-host"
)

// GetClientConn returns a gRPC client connection.
// creates a client connection to the given target. By default, it's
Expand Down Expand Up @@ -64,3 +72,29 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, addr)
return metadata.NewOutgoingContext(ctx, md)
}

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
if err != nil {
return nil, err
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsConfig, opt...)
if err != nil {
return nil, err
}
if old, ok := clientConns.Load(addr); ok {
cc.Close()
log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return old.(*grpc.ClientConn), nil
}
clientConns.Store(addr, cc)
return cc, nil
}
9 changes: 4 additions & 5 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// KeyspaceClient manages keyspace metadata.
Expand All @@ -39,8 +38,8 @@ type KeyspaceClient interface {

// keyspaceClient returns the KeyspaceClient from current PD leader.
func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return keyspacepb.NewKeyspaceClient(cc.(*grpc.ClientConn))
if client := c.bc.GetServingEndpointClientConn(); client != nil {
return keyspacepb.NewKeyspaceClient(client)
}
return nil
}
Expand All @@ -64,7 +63,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key

if err != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
c.bc.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down Expand Up @@ -143,7 +142,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
c.bc.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ResourceManagerClient interface {

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
if cc, err := c.getOrCreateGRPCConn(c.GetLeaderAddr()); err == nil {
if cc, err := c.bc.GetOrCreateGRPCConn(c.GetLeaderAddr()); err == nil {
return rmpb.NewResourceManagerClient(cc)
}
return nil
Expand All @@ -60,7 +60,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {
// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) {
c.ScheduleCheckLeader()
c.bc.ScheduleCheckMemberChanged()
}
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
// If the stream is still nil, return an error.
if stream == nil {
firstRequest.done <- errors.Errorf("failed to get the stream connection")
c.ScheduleCheckLeader()
c.bc.ScheduleCheckMemberChanged()
connection.reset()
continue
}
Expand All @@ -315,7 +315,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb
default:
}
if err = c.processTokenRequests(stream, firstRequest); err != nil {
c.ScheduleCheckLeader()
c.bc.ScheduleCheckMemberChanged()
connection.reset()
log.Info("[resource_manager] token request error", zap.Error(err))
}
Expand Down
138 changes: 138 additions & 0 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"
)

type tsoBatchController struct {
maxBatchSize int
// bestBatchSize is a dynamic size that changed based on the current batch effect.
bestBatchSize int

tsoRequestCh chan *tsoRequest
collectedRequests []*tsoRequest
collectedRequestCount int

batchStartTime time.Time
}

func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController {
return &tsoBatchController{
maxBatchSize: maxBatchSize,
bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */
tsoRequestCh: tsoRequestCh,
collectedRequests: make([]*tsoRequest, maxBatchSize+1),
collectedRequestCount: 0,
}
}

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstTSORequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstTSORequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
tbc.collectedRequestCount = 0
tbc.pushRequest(firstTSORequest)

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
default:
break fetchPendingRequestsLoop
}
}

// Check whether we should fetch more pending TSO requests from the channel.
// TODO: maybe consider the actual load that returns through a TSO response from PD server.
if tbc.collectedRequestCount >= tbc.maxBatchSize || maxBatchWaitInterval <= 0 {
return nil
}

// Fetches more pending TSO requests from the channel.
// Try to collect `tbc.bestBatchSize` requests, or wait `maxBatchWaitInterval`
// when `tbc.collectedRequestCount` is less than the `tbc.bestBatchSize`.
if tbc.collectedRequestCount < tbc.bestBatchSize {
after := time.NewTimer(maxBatchWaitInterval)
defer after.Stop()
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
case <-after.C:
return nil
}
}
}

// Do an additional non-block try. Here we test the length with `tbc.maxBatchSize` instead
// of `tbc.bestBatchSize` because trying best to fetch more requests is necessary so that
// we can adjust the `tbc.bestBatchSize` dynamically later.
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
return nil
}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
}

func (tbc *tsoBatchController) getCollectedRequests() []*tsoRequest {
return tbc.collectedRequests[:tbc.collectedRequestCount]
}

// adjustBestBatchSize stabilizes the latency with the AIAD algorithm.
func (tbc *tsoBatchController) adjustBestBatchSize() {
tsoBestBatchSize.Observe(float64(tbc.bestBatchSize))
length := tbc.collectedRequestCount
if length < tbc.bestBatchSize && tbc.bestBatchSize > 1 {
// Waits too long to collect requests, reduce the target batch size.
tbc.bestBatchSize--
} else if length > tbc.bestBatchSize+4 /* Hard-coded number, in order to make `tbc.bestBatchSize` stable */ &&
tbc.bestBatchSize < tbc.maxBatchSize {
tbc.bestBatchSize++
}
}

func (tbc *tsoBatchController) revokePendingTSORequest(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
}
}
Loading