Skip to content

Commit

Permalink
cli(cdc): handle error correctly with wrong pd address but with a grp…
Browse files Browse the repository at this point in the history
…c service (#6503)

close #6458
  • Loading branch information
Xuanhe Chen authored Jul 29, 2022
1 parent aa12384 commit 2fb5ac1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
21 changes: 14 additions & 7 deletions pkg/cmd/cli/cli_unsafe_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,25 @@ func newUnsafeResetOptions() *unsafeResetOptions {
}

// complete adapts from the command line args to the data and client required.
func (o *unsafeResetOptions) complete(f factory.Factory) error {
etcdClient, err := f.EtcdClient()
func (o *unsafeResetOptions) complete(f factory.Factory) (err error) {
pdClient, err := f.PdClient()
if err != nil {
return err
}
etcdClient.ClusterID = o.clusterID
o.etcdClient = etcdClient
defer func() {
if err != nil {
pdClient.Close()
}
}()

pdClient, err := f.PdClient()
o.pdClient = pdClient

etcdClient, err := f.EtcdClient()
if err != nil {
return err
}

o.pdClient = pdClient
etcdClient.ClusterID = o.clusterID
o.etcdClient = etcdClient

return nil
}
Expand All @@ -61,6 +66,8 @@ func (o *unsafeResetOptions) addFlags(cmd *cobra.Command) {
// run runs the `cli unsafe reset` command.
func (o *unsafeResetOptions) run(cmd *cobra.Command) error {
ctx := context.GetDefaultContext()
defer o.pdClient.Close()
defer o.etcdClient.Close()

leases, err := o.etcdClient.GetCaptureLeases(ctx)
if err != nil {
Expand Down
28 changes: 25 additions & 3 deletions pkg/cmd/factory/factory_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,16 @@ func (f *factoryImpl) EtcdClient() (*etcd.CDCEtcdClient, error) {
})
if err != nil {
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
"Fail to open PD client. Please check the pd address(es) \"%s\"", pdAddr)
}

client, err := etcd.NewCDCEtcdClient(ctx, etcdClient, etcd.DefaultCDCClusterID)
if err != nil {
return nil, cerror.ErrEtcdAPIError.GenWithStack(
"Etcd operation error. Please check the cluster's status " +
" and the pd address(es) \"%s\"")
}

return &client, err
}

Expand All @@ -155,7 +161,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) {
pdAddr := f.GetPdAddr()
if len(pdAddr) == 0 {
return nil, cerror.ErrInvalidServerOption.
GenWithStack("empty PD address, please use --pd to specify PD cluster addresses")
GenWithStack("Empty PD address. Please use --pd to specify PD cluster addresses")
}
pdEndpoints := strings.Split(pdAddr, ",")
for _, ep := range pdEndpoints {
Expand Down Expand Up @@ -184,7 +190,7 @@ func (f factoryImpl) PdClient() (pd.Client, error) {
))
if err != nil {
return nil, errors.Annotatef(err,
"fail to open PD client, please check pd address \"%s\"", pdAddr)
"Fail to open PD client. Please check the pd address(es) \"%s\"", pdAddr)
}

err = version.CheckClusterVersion(ctx, pdClient, pdEndpoints, credential, true)
Expand Down Expand Up @@ -226,6 +232,15 @@ func (f *factoryImpl) APIV2Client() (apiv2client.APIV2Interface, error) {
return apiv2client.NewAPIClient(serverAddr, f.clientGetter.GetCredential())
}

// findServerAddr find the cdc server address by the following logic
// a) Only the cdc server address is specified: use it;
// b) Only the PD address is specified:
// 1) check the address and create a etcdClient
// 2) check the etcd keys and find cdc cluster addresses:
// If there are multiple CDC clusters exist, report an error; otherwise,
// cache the server address in f.fetchedServerAddr and return it (since
// some cli cmds use both apiV1 and apiV2)
// c) Both PD and cdc server addresses are specified: report an error
func (f *factoryImpl) findServerAddr() (string, error) {
if f.fetchedServerAddr != "" {
return f.fetchedServerAddr, nil
Expand All @@ -244,11 +259,18 @@ func (f *factoryImpl) findServerAddr() (string, error) {
if f.clientGetter.GetServerAddr() != "" {
return f.clientGetter.GetServerAddr(), nil
}
// check pd-address represents a real pd cluster
pdClient, err := f.PdClient()
if err != nil {
return "", errors.Trace(err)
}
defer pdClient.Close()
// use pd to get server addr from etcd
etcdClient, err := f.EtcdClient()
if err != nil {
return "", errors.Trace(err)
}
defer etcdClient.Close()

ctx := cmdconetxt.GetDefaultContext()
err = etcdClient.CheckMultipleCDCClusterExist(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/factory/factory_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestFactoryImplPdClient(t *testing.T) {
c.EXPECT().GetPdAddr().Return(pdAddr).Times(1)
pdClient, err = f.PdClient()
require.Nil(t, pdClient)
require.Contains(t, err.Error(), "fail to open PD client")
require.Contains(t, err.Error(), "Fail to open PD client")
}

type mockAPIClient struct {
Expand Down

0 comments on commit 2fb5ac1

Please sign in to comment.