Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reformat codec and add keyspace support #649

Merged
merged 13 commits into from
Jan 10, 2023
Merged
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func GetTxnScopeFromConfig() string {
}

// ParsePath parses this path.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false&keyspaceName=SomeKeyspace
func ParsePath(path string) (etcdAddrs []string, disableGC bool, keyspaceName string, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
Expand All @@ -192,7 +192,10 @@ func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {

query := u.Query()
keyspaceName = query.Get("keyspaceName")
switch strings.ToLower(query.Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
Expand Down
8 changes: 5 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ import (
)

func TestParsePath(t *testing.T) {
etcdAddrs, disableGC, err := ParsePath("tikv://node1:2379,node2:2379")
etcdAddrs, disableGC, keyspaceName, err := ParsePath("tikv://node1:2379,node2:2379")

assert.Nil(t, err)
assert.Equal(t, []string{"node1:2379", "node2:2379"}, etcdAddrs)
assert.False(t, disableGC)
assert.Empty(t, keyspaceName)

_, _, err = ParsePath("tikv://node1:2379")
_, _, _, err = ParsePath("tikv://node1:2379")
assert.Nil(t, err)

_, disableGC, err = ParsePath("tikv://node1:2379?disableGC=true")
_, disableGC, keyspaceName, err = ParsePath("tikv://node1:2379?disableGC=true&keyspaceName=DEFAULT")
assert.Nil(t, err)
assert.True(t, disableGC)
assert.Equal(t, "DEFAULT", keyspaceName)
}

func TestTxnScopeValue(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/google/uuid v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.18.1 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testCommitterSuite) SetupTest() {
s.Require().Nil(err)
testutils.BootstrapWithMultiRegions(cluster, []byte("a"), []byte("b"), []byte("c"))
s.cluster = cluster
pdCli := &tikv.CodecPDClient{Client: pdClient}
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
spkv := tikv.NewMockSafePointKV()
store, err := tikv.NewKVStore("mocktikv-store", pdCli, spkv, client)
store.EnableTxnLocalLatches(8096)
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (s *testLockSuite) TestBatchResolveLocks() {
s.Nil(err)
committer.SetUseAsyncCommit()
committer.SetLockTTL(20000)
committer.PrewriteAllMutations(context.Background())
err = committer.PrewriteAllMutations(context.Background())
s.Nil(err)

var locks []*txnkv.Lock
Expand Down
6 changes: 4 additions & 2 deletions integration_tests/raw/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ func (s *apiTestSuite) newRawKVClient(pdCli pd.Client, addrs []string) *rawkv.Cl
}

func (s *apiTestSuite) wrapPDClient(pdCli pd.Client, addrs []string) pd.Client {
if s.apiVersion == kvrpcpb.APIVersion_V2 {
return tikv.NewCodecPDClientV2(pdCli, tikv.ModeRaw)
var err error
if s.getApiVersion(pdCli) == kvrpcpb.APIVersion_V2 {
pdCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeRaw, pdCli, tikv.DefaultKeyspaceName)
}
s.Nil(err)
return pdCli
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ package tikv_test

import (
"context"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"sync"
"testing"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pkg/errors"
Expand Down
64 changes: 58 additions & 6 deletions integration_tests/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ import (
"context"
"flag"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
"unsafe"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
txndriver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -95,21 +99,69 @@ func NewTestUniStore(t *testing.T) *tikv.KVStore {
}

func newTiKVStore(t *testing.T) *tikv.KVStore {
re := require.New(t)
addrs := strings.Split(*pdAddrs, ",")
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
require.Nil(t, err)
re.Nil(err)
var opt tikv.ClientOpt
switch mustGetApiVersion(re, pdClient) {
case kvrpcpb.APIVersion_V1:
pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
opt = tikv.WithCodec(tikv.NewCodecV1(tikv.ModeTxn))
case kvrpcpb.APIVersion_V2:
codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, tikv.DefaultKeyspaceName)
pdClient = codecCli
re.Nil(err)
opt = tikv.WithCodec(codecCli.GetCodec())
default:
re.Fail("unknown api version")
}
var securityConfig config.Security
tlsConfig, err := securityConfig.ToTLSConfig()
require.Nil(t, err)
re.Nil(err)
spKV, err := tikv.NewEtcdSafePointKV(addrs, tlsConfig)
require.Nil(t, err)
store, err := tikv.NewKVStore("test-store", &tikv.CodecPDClient{Client: pdClient}, spKV, tikv.NewRPCClient())
require.Nil(t, err)
re.Nil(err)
store, err := tikv.NewKVStore(
"test-store",
pdClient,
spKV,
tikv.NewRPCClient(opt),
)
re.Nil(err)
err = clearStorage(store)
require.Nil(t, err)
re.Nil(err)
return store
}

func mustGetApiVersion(re *require.Assertions, pdCli pd.Client) kvrpcpb.APIVersion {
stores, err := pdCli.GetAllStores(context.Background())
re.NoError(err)

for _, store := range stores {
resp := mustGetConfig(re, fmt.Sprintf("http://%s/config", store.StatusAddress))
v := gjson.Get(resp, "storage.api-version")
if v.Type == gjson.Null || v.Uint() != 2 {
return kvrpcpb.APIVersion_V1
}
}
return kvrpcpb.APIVersion_V2
}

func mustGetConfig(re *require.Assertions, url string) string {
transport := &http.Transport{}
client := http.Client{
Transport: transport,
}
defer transport.CloseIdleConnections()
resp, err := client.Get(url)
re.NoError(err)
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
re.NoError(err)
return string(body)
}

func clearStorage(store *tikv.KVStore) error {
txn, err := store.Begin()
if err != nil {
Expand Down
86 changes: 86 additions & 0 deletions internal/apicodec/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package apicodec

import (
"encoding/binary"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/tikvrpc"
)

type (
// Mode represents the operation mode of a request.
Mode int
// KeyspaceID denotes the target keyspace of the request.
KeyspaceID uint32
)

const (
// ModeRaw represent a raw operation in TiKV.
ModeRaw = iota
// ModeTxn represent a transaction operation in TiKV.
ModeTxn
)

const (
// NulSpaceID is a special keyspace id that represents no keyspace exist.
NulSpaceID KeyspaceID = 0xffffffff
)

// ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key.
// It returns error if the given key is not in proper api-v2 format.
func ParseKeyspaceID(b []byte) (KeyspaceID, error) {
if len(b) < keyspacePrefixLen || (b[0] != rawModePrefix && b[0] != txnModePrefix) {
return 0, errors.Errorf("unsupported key %s", b)
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
}

buf := append([]byte{}, b[:keyspacePrefixLen]...)
buf[0] = 0

return KeyspaceID(binary.BigEndian.Uint32(buf)), nil
}

// Codec is responsible for encode/decode requests.
type Codec interface {
// GetAPIVersion returns the api version of the codec.
GetAPIVersion() kvrpcpb.APIVersion
// GetKeyspace return the keyspace id of the codec in bytes.
GetKeyspace() []byte
// GetKeyspaceID return the keyspace id of the codec.
GetKeyspaceID() KeyspaceID
// EncodeRequest encodes with the given Codec.
// NOTE: req is reused on retry. MUST encode on cloned request, other than overwrite the original.
EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)
// DecodeResponse decode the resp with the given codec.
DecodeResponse(req *tikvrpc.Request, resp *tikvrpc.Response) (*tikvrpc.Response, error)
// EncodeRegionKey encode region's key.
EncodeRegionKey(key []byte) []byte
// DecodeRegionKey decode region's key
DecodeRegionKey(encodedKey []byte) ([]byte, error)
// EncodeRegionRange encode region's start and end.
EncodeRegionRange(start, end []byte) ([]byte, []byte)
// DecodeRegionRange decode region's start and end.
DecodeRegionRange(encodedStart, encodedEnd []byte) ([]byte, []byte, error)
// EncodeRange encode a key range.
EncodeRange(start, end []byte) ([]byte, []byte)
// DecodeRange decode a key range.
DecodeRange(encodedStart, encodedEnd []byte) ([]byte, []byte, error)
// EncodeKey encode a key.
EncodeKey(key []byte) []byte
// DecodeKey decode a key.
DecodeKey(encoded []byte) ([]byte, error)
}

// DecodeKey split a key to it's keyspace prefix and actual key.
func DecodeKey(encoded []byte, version kvrpcpb.APIVersion) ([]byte, []byte, error) {
switch version {
case kvrpcpb.APIVersion_V1:
return nil, encoded, nil
case kvrpcpb.APIVersion_V2:
if len(encoded) < keyspacePrefixLen {
return nil, nil, errors.Errorf("invalid V2 key: %s", encoded)
}
return encoded[:keyspacePrefixLen], encoded[keyspacePrefixLen:], nil
}
return nil, nil, errors.Errorf("unsupported api version %s", version.String())
}
25 changes: 25 additions & 0 deletions internal/apicodec/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package apicodec

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseKeyspaceID(t *testing.T) {
id, err := ParseKeyspaceID([]byte{'x', 1, 2, 3, 1, 2, 3})
assert.Nil(t, err)
assert.Equal(t, KeyspaceID(0x010203), id)

id, err = ParseKeyspaceID([]byte{'r', 1, 2, 3, 1, 2, 3, 4})
assert.Nil(t, err)
assert.Equal(t, KeyspaceID(0x010203), id)

id, err = ParseKeyspaceID([]byte{'t', 0, 0})
assert.NotNil(t, err)
assert.Equal(t, KeyspaceID(0), id)

id, err = ParseKeyspaceID([]byte{'t', 0, 0, 1, 1, 2, 3})
assert.NotNil(t, err)
assert.Equal(t, KeyspaceID(0), id)
}
Loading