Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: support to get all keyspaces #6793

Merged
merged 15 commits into from
Jul 13, 2023
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/ystaticy/kvproto v0.0.0-20230713011631-25642a4bb0ad
ystaticy marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc=
github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -130,6 +128,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/ystaticy/kvproto v0.0.0-20230713011631-25642a4bb0ad h1:/naL5dUFk4CKRj+NQRuist0kqeqO35AiPbVJBDNizis=
github.com/ystaticy/kvproto v0.0.0-20230713011631-25642a4bb0ad/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down
76 changes: 34 additions & 42 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// KeyspaceClient manages keyspace metadata.
type KeyspaceClient interface {
// LoadKeyspace load and return target keyspace's metadata.
LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error)
// WatchKeyspaces watches keyspace meta changes.
WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error)
// UpdateKeyspaceState updates target keyspace's state.
UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error)
// GetAllKeyspaces get all keyspace's metadata.
GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error)
}

// keyspaceClient returns the KeyspaceClient from current PD leader.
Expand Down Expand Up @@ -75,44 +73,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key
return resp.Keyspace, nil
}

// WatchKeyspaces watches keyspace meta changes.
// It returns a stream of slices of keyspace metadata.
// The first message in stream contains all current keyspaceMeta,
// all subsequent messages contains new put events for all keyspaces.
func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) {
ystaticy marked this conversation as resolved.
Show resolved Hide resolved
keyspaceWatcherChan := make(chan []*keyspacepb.KeyspaceMeta)
req := &keyspacepb.WatchKeyspacesRequest{
Header: c.requestHeader(),
}
stream, err := c.keyspaceClient().WatchKeyspaces(ctx, req)
if err != nil {
close(keyspaceWatcherChan)
return nil, err
}
go func() {
defer func() {
close(keyspaceWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in keyspace client `WatchKeyspaces`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
return
}
keyspaceWatcherChan <- resp.Keyspaces
}
}
}()
return keyspaceWatcherChan, err
}

// UpdateKeyspaceState attempts to update the keyspace specified by ID to the target state,
// it will also record StateChangedAt for the given keyspace if a state change took place.
// Currently, legal operations includes:
Expand Down Expand Up @@ -153,3 +113,35 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

return resp.Keyspace, nil
}

// GetAllKeyspaces get all keyspaces metadata.
func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If replace WatchKeyspaces with GetAllKeyspaces, will it cost too much? Do we need to use watch loop in server?

if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &keyspacepb.GetAllKeyspacesRequest{
Header: c.requestHeader(),
StartId: startID,
Limit: limit,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req)
cancel()

if err != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

if resp.Header.GetError() != nil {
cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds())
return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String())
}

return resp.Keyspaces, nil
}
2 changes: 2 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
cmdDurationSplitAndScatterRegions prometheus.Observer
cmdDurationLoadKeyspace prometheus.Observer
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGetAllKeyspaces prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
Expand Down Expand Up @@ -184,6 +185,7 @@ func initCmdDurations() {
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace")
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,4 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/ystaticy/kvproto v0.0.0-20230713011631-25642a4bb0ad
Loading