Skip to content

Commit

Permalink
force to construct CodecPDClient by keyspace name
Browse files Browse the repository at this point in the history
Signed-off-by: iosmanthus <[email protected]>
  • Loading branch information
iosmanthus committed Aug 9, 2022
1 parent da297a5 commit e534e37
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 73 deletions.
2 changes: 1 addition & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *testCommitterSuite) SetupTest() {
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, []byte("a"), []byte("b"), []byte("c"))
s.cluster = cluster
pdCli := tikv.NewCodecPDClient(pdClient, tikv.NewCodecV1(tikv.ModeTxn))
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
spkv := tikv.NewMockSafePointKV()
store, err := tikv.NewKVStore("mocktikv-store", pdCli, spkv, client)
store.EnableTxnLocalLatches(8096)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220808072825-3692dfb0dad7
github.com/pingcap/tidb v1.1.0-beta.0.20220706093502-562b03368993
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220808072825-3692dfb0dad7 h1:ECiYgZszvTI/RHnIDLsHHcgcrdAcxS2PYgMyU6PH43g=
github.com/pingcap/kvproto v0.0.0-20220808072825-3692dfb0dad7/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/raw/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func (s *apiTestSuite) newRawKVClient(pdCli pd.Client, addrs []string) *rawkv.Cl
}

func (s *apiTestSuite) wrapPDClient(pdCli pd.Client, addrs []string) pd.Client {
var err error
if s.getApiVersion(pdCli) == kvrpcpb.APIVersion_V2 {
codec, err := tikv.NewCodecV2(tikv.ModeRaw, tikv.DefaultKeyspaceID)
s.NoError(err)
return tikv.NewCodecPDClient(pdCli, codec)
pdCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, tikv.DefaultKeyspaceName)
}
s.Nil(err)
return pdCli
}

Expand Down
10 changes: 5 additions & 5 deletions integration_tests/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ func newTiKVStore(t *testing.T) *tikv.KVStore {
var opt tikv.ClientOpt
switch mustGetApiVersion(re, pdClient) {
case kvrpcpb.APIVersion_V1:
pdClient = tikv.NewCodecPDClient(pdClient, tikv.NewCodecV1(tikv.ModeTxn))
pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
opt = tikv.WithCodec(tikv.NewCodecV1(tikv.ModeTxn))
case kvrpcpb.APIVersion_V2:
apiCodec, err := tikv.NewCodecV2(tikv.ModeTxn, tikv.DefaultKeyspaceID)
re.NoError(err)
pdClient = tikv.NewCodecPDClient(pdClient, apiCodec)
opt = tikv.WithCodec(apiCodec)
codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, tikv.DefaultKeyspaceName)
pdClient = codecCli
re.Nil(err)
opt = tikv.WithCodec(codecCli.GetCodec())
default:
re.Fail("unknown api version")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/apicodec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewCodecV2(t *testing.T) {
mode: ModeRaw,
spaceID: 1<<24 - 1,
expectedPrefix: []byte{'r', 255, 255, 255},
expectedEnd: []byte{'s', 255, 255, 255},
expectedEnd: []byte{'s', 0, 0, 0},
},
}
for _, testCase := range testCases {
Expand All @@ -139,7 +139,7 @@ func TestNewCodecV2(t *testing.T) {
}
codec, err := NewCodecV2(testCase.mode, testCase.spaceID)
re.NoError(err)

v2Codec, ok := codec.(*codecV2)
re.True(ok)
re.Equal(testCase.expectedPrefix, v2Codec.prefix)
Expand Down
11 changes: 11 additions & 0 deletions internal/apicodec/codec_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ import (
var (
// DefaultKeyspaceID is the keyspaceID of the default keyspace.
DefaultKeyspaceID uint32 = 0
// DefaultKeyspaceName is the name of the default keyspace.
DefaultKeyspaceName = "DEFAULT"

rawModePrefix byte = 'r'
txnModePrefix byte = 'x'
)

// BuildKeyspaceName builds a keyspace name
func BuildKeyspaceName(name string) string {
if name == "" {
return DefaultKeyspaceName
}
return name
}

// codecV2 is used to encode/decode keys and request into APIv2 format.
type codecV2 struct {
prefix []byte
Expand Down
33 changes: 30 additions & 3 deletions internal/locate/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ package locate

import (
"context"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/apicodec"
Expand All @@ -51,11 +51,38 @@ type CodecPDClient struct {
codec apicodec.Codec
}

// NewCodecPDClient creates a CodecPDClient.
func NewCodecPDClient(client pd.Client, codec apicodec.Codec) *CodecPDClient {
// NewCodecPDClient creates a CodecPDClient in API v1.
func NewCodecPDClient(mode apicodec.Mode, client pd.Client) *CodecPDClient {
codec := apicodec.NewCodecV1(mode)
return &CodecPDClient{client, codec}
}

// NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name.
func NewCodecPDClientWithKeyspace(mode apicodec.Mode, client pd.Client, keyspace string) (*CodecPDClient, error) {
id, err := GetKeyspaceID(client, keyspace)
if err != nil {
return nil, err
}
codec, err := apicodec.NewCodecV2(mode, id)
if err != nil {
return nil, err
}

return &CodecPDClient{client, codec}, nil
}

func GetKeyspaceID(client pd.Client, name string) (uint32, error) {
meta, err := client.LoadKeyspace(context.Background(), apicodec.BuildKeyspaceName(name))
if err != nil {
return 0, err
}
// If keyspace is not enabled, user should not be able to connect.
if meta.State != keyspacepb.KeyspaceState_ENABLED {
return 0, errors.Errorf("keyspace %s not enabled", name)
}
return meta.Id, nil
}

// GetCodec returns CodecPDClient's codec.
func (c *CodecPDClient) GetCodec() apicodec.Codec {
return c.codec
Expand Down
38 changes: 13 additions & 25 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package rawkv
import (
"bytes"
"context"
"github.com/tikv/client-go/v2/tikv"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
Expand Down Expand Up @@ -138,7 +139,7 @@ type option struct {
security config.Security
gRPCDialOptions []grpc.DialOption
pdOptions []pd.ClientOption
keyspaceName string
keyspace string
}

// ClientOpt is factory to set the client options.
Expand Down Expand Up @@ -173,9 +174,9 @@ func WithAPIVersion(apiVersion kvrpcpb.APIVersion) ClientOpt {
}

// WithKeyspace is used to set the keyspace Name.
func WithKeyspace(keyspaceName string) ClientOpt {
func WithKeyspace(name string) ClientOpt {
return func(o *option) {
o.keyspaceName = keyspaceName
o.keyspace = name
}
}

Expand Down Expand Up @@ -225,46 +226,33 @@ func NewClientWithOpts(ctx context.Context, pdAddrs []string, opts ...ClientOpt)
return nil, errors.WithStack(err)
}

// Construct codec from options.
var codec apicodec.Codec
// Build a CodecPDClient
var codecCli *tikv.CodecPDClient

switch opt.apiVersion {
case kvrpcpb.APIVersion_V1:
codec = apicodec.NewCodecV1(apicodec.ModeRaw)
case kvrpcpb.APIVersion_V1TTL:
codec = apicodec.NewCodecV1(apicodec.ModeRaw)
case kvrpcpb.APIVersion_V1, kvrpcpb.APIVersion_V1TTL:
codecCli = locate.NewCodecPDClient(tikv.ModeRaw, pdCli)
case kvrpcpb.APIVersion_V2:
var keyspaceID uint32
if len(opt.keyspaceName) != 0 {
// If keyspaceName is set, obtain keyspaceID from PD.
keyspaceID, err = getKeyspaceID(pdCli, opt.keyspaceName)
if err != nil {
return nil, err
}
} else {
// If KeyspaceName is unset, use default keyspaceID.
keyspaceID = apicodec.DefaultKeyspaceID
}
codec, err = apicodec.NewCodecV2(apicodec.ModeRaw, keyspaceID)
codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, opt.keyspace)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown api version: %d", opt.apiVersion)
}

pdCli = codecCli

rpcCli := client.NewRPCClient(
client.WithSecurity(opt.security),
client.WithGRPCDialOptions(opt.gRPCDialOptions...),
client.WithCodec(codec),
client.WithCodec(codecCli.GetCodec()),
)

pdCli = locate.NewCodecPDClient(pdCli, codec)

return &Client{
apiVersion: opt.apiVersion,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
codec: codec,
pdClient: pdCli,
rpcClient: rpcCli,
}, nil
Expand Down
9 changes: 2 additions & 7 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/latch"
"github.com/tikv/client-go/v2/internal/locate"
Expand Down Expand Up @@ -193,11 +192,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
return store, nil
}

// WrapPDClient wrap pd.Client with codec and interceptors.
func WrapPDClient(pdCli pd.Client, codec apicodec.Codec) pd.Client {
return locate.NewCodecPDClient(util.InterceptedPDClient{Client: pdCli}, codec)
}

// NewPDClient returns an unwrapped pd client.
func NewPDClient(pdAddrs []string) (pd.Client, error) {
cfg := config.GetGlobalConfig()
Expand Down Expand Up @@ -589,6 +583,7 @@ var _ = NewLockResolver
// NewLockResolver creates a LockResolver.
// It is exported for other pkg to use. For instance, binlog service needs
// to determine a transaction's commit state.
// TODO(iosmanthus): support api v2
func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.ClientOption) (*txnlock.LockResolver, error) {
pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
Expand All @@ -611,7 +606,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
return nil, err
}

s, err := NewKVStore(uuid, locate.NewCodecPDClient(pdCli, apicodec.NewCodecV1(apicodec.ModeTxn)), spkv, client.NewRPCClient(WithSecurity(security)))
s, err := NewKVStore(uuid, locate.NewCodecPDClient(ModeTxn, pdCli), spkv, client.NewRPCClient(WithSecurity(security)))
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,15 @@ type RegionRequestRuntimeStats = locate.RegionRequestRuntimeStats
// RPCRuntimeStats indicates the RPC request count and consume time.
type RPCRuntimeStats = locate.RPCRuntimeStats

// CodecPDClient wraps a PD Client to decode the encoded keys in region meta.
type CodecPDClient = locate.CodecPDClient

// NewCodecPDClient is a constructor for CodecPDClient
var NewCodecPDClient = locate.NewCodecPDClient

// NewCodecPDClientWithKeyspace creates a CodecPDClient in API v2 with keyspace name.
var NewCodecPDClientWithKeyspace = locate.NewCodecPDClientWithKeyspace

// NewCodecV1 is a constructor for v1 Codec.
var NewCodecV1 = apicodec.NewCodecV1

Expand All @@ -104,6 +110,9 @@ type Codec = apicodec.Codec
// DefaultKeyspaceID is the keyspaceID of the default keyspace.
var DefaultKeyspaceID = apicodec.DefaultKeyspaceID

// DefaultKeyspaceName is the name of the default keyspace.
var DefaultKeyspaceName = apicodec.DefaultKeyspaceName

// Mode represents the operation mode of a request, export client.Mode
type Mode = apicodec.Mode

Expand Down
2 changes: 1 addition & 1 deletion tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
Client: client,
codec: codec,
}
pdCli := pd.Client(locate.NewCodecPDClient(pdClient, codec))
pdCli := pd.Client(locate.NewCodecPDClient(ModeTxn, pdClient))

if clientHijack != nil {
client = clientHijack(client)
Expand Down
33 changes: 10 additions & 23 deletions txnkv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package txnkv
import (
"context"
"fmt"
"github.com/tikv/client-go/v2/util"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -81,40 +81,26 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) {
if err != nil {
return nil, errors.WithStack(err)
}

pdClient = util.InterceptedPDClient{Client: pdClient}

// Construct codec from options.
var codec apicodec.Codec
var codecCli *tikv.CodecPDClient
switch opt.apiVersion {
case kvrpcpb.APIVersion_V1:
codec = apicodec.NewCodecV1(apicodec.ModeTxn)
case kvrpcpb.APIVersion_V1TTL:
codec = apicodec.NewCodecV1(apicodec.ModeTxn)
codecCli = tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
case kvrpcpb.APIVersion_V2:
var keyspaceID uint32
if len(opt.keyspaceName) != 0 {
// If keyspaceName is set, obtain keyspaceID from PD.
keyspaceID, err = getKeyspaceID(pdClient, opt.keyspaceName)
if err != nil {
return nil, err
}
} else {
// If KeyspaceName is unset, use default keyspaceID.
keyspaceID = apicodec.DefaultKeyspaceID
}
codec, err = apicodec.NewCodecV2(apicodec.ModeTxn, keyspaceID)
codecCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, opt.keyspaceName)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown api version: %d", opt.apiVersion)
}

// Wrapping the pd client with the new codec.
pdClient = tikv.WrapPDClient(pdClient, codec)
pdClient = codecCli

cfg := config.GetGlobalConfig()
if err != nil {
return nil, err
}
// init uuid
uuid := fmt.Sprintf("tikv-%v", pdClient.GetClusterID(context.TODO()))
tlsConfig, err := cfg.Security.ToTLSConfig()
Expand All @@ -127,7 +113,8 @@ func NewClient(pdAddrs []string, opts ...ClientOpt) (*Client, error) {
return nil, err
}

rpcClient := tikv.NewRPCClient(tikv.WithSecurity(cfg.Security), tikv.WithCodec(codec))
rpcClient := tikv.NewRPCClient(tikv.WithSecurity(cfg.Security), tikv.WithCodec(codecCli.GetCodec()))

s, err := tikv.NewKVStore(uuid, pdClient, spkv, rpcClient)
if err != nil {
return nil, err
Expand Down

0 comments on commit e534e37

Please sign in to comment.