From 526341c44af3753bc7cb08eb9e77659221c54c49 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 18 Nov 2019 17:38:29 +0800 Subject: [PATCH 1/2] return revision when create key --- pkg/etcd/etcd.go | 13 +++++++++---- pkg/etcd/etcd_test.go | 28 ++++++++++++++++------------ tidb-binlog/node/registry.go | 2 +- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 3d7f681ae..63c1f9572 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -70,7 +70,7 @@ func (e *Client) Close() error { } // Create guarantees to set a key = value with some options(like ttl) -func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) error { +func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) (int64, error) { key = keyWithPrefix(e.rootPath, key) txnResp, err := e.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", 0), @@ -78,14 +78,19 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie clientv3.OpPut(key, val, opts...), ).Commit() if err != nil { - return errors.Trace(err) + return 0, errors.Trace(err) } if !txnResp.Succeeded { - return errors.AlreadyExistsf("key %s in etcd", key) + return 0, errors.AlreadyExistsf("key %s in etcd", key) } - return nil + if txnResp.Header != nil { + return txnResp.Header.Revision, nil + } + + // impossible to happen + return -1, nil } // Get returns a key/value matchs the given key diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index f9fb01924..faa978e05 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -52,7 +52,7 @@ func (t *testEtcdSuite) TestCreate(c *C) { c.Assert(err, IsNil) c.Assert(getResp.Kvs, HasLen, 0) - err = etcdCli.Create(ctx, key, obj, nil) + _, err = etcdCli.Create(ctx, key, obj, nil) c.Assert(err, IsNil) getResp, err = etcdClient.KV.Get(ctx, key) @@ -68,7 +68,7 @@ func (t *testEtcdSuite) TestCreateWithTTL(c *C) { c.Assert(err, IsNil) opts := []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))} - err = etcdCli.Create(ctx, key, obj, opts) + _, err = etcdCli.Create(ctx, key, obj, opts) c.Assert(err, IsNil) time.Sleep(2 * time.Second) @@ -84,7 +84,7 @@ func (t *testEtcdSuite) TestCreateWithKeyExist(c *C) { _, err := etcdClient.KV.Put(ctx, key, obj, nil...) c.Assert(err, IsNil) - err = etcdCli.Create(ctx, key, obj, nil) + _, err = etcdCli.Create(ctx, key, obj, nil) c.Assert(errors.IsAlreadyExists(err), IsTrue) } @@ -97,12 +97,13 @@ func (t *testEtcdSuite) TestUpdate(c *C) { c.Assert(err, IsNil) opts := []clientv3.OpOption{clientv3.WithLease(lcr.ID)} - err = etcdCli.Create(ctx, key, obj1, opts) + revision0, err := etcdCli.Create(ctx, key, obj1, opts) c.Assert(err, IsNil) res, revision1, err := etcdCli.Get(ctx, key) c.Assert(err, IsNil) c.Assert(string(res), Equals, obj1) + c.Assert(revision0, Equals, revision1) time.Sleep(time.Second) @@ -137,19 +138,22 @@ func (t *testEtcdSuite) TestList(c *C) { k3 := key + "/level3" k11 := key + "/level1/level1" - err := etcdCli.Create(ctx, k1, k1, nil) + revision1, err := etcdCli.Create(ctx, k1, k1, nil) c.Assert(err, IsNil) - err = etcdCli.Create(ctx, k2, k2, nil) + revision2, err := etcdCli.Create(ctx, k2, k2, nil) c.Assert(err, IsNil) + c.Assert(revision2 > revision1, IsTrue) - err = etcdCli.Create(ctx, k3, k3, nil) + revision3, err := etcdCli.Create(ctx, k3, k3, nil) c.Assert(err, IsNil) + c.Assert(revision3 > revision2, IsTrue) - err = etcdCli.Create(ctx, k11, k11, nil) + revision4, err := etcdCli.Create(ctx, k11, k11, nil) c.Assert(err, IsNil) + c.Assert(revision4 > revision3, IsTrue) - root, revision1, err := etcdCli.List(ctx, key) + root, revision5, err := etcdCli.List(ctx, key) c.Assert(err, IsNil) c.Assert(string(root.Childs["level1"].Value), Equals, k1) c.Assert(string(root.Childs["level1"].Childs["level1"].Value), Equals, k11) @@ -157,16 +161,16 @@ func (t *testEtcdSuite) TestList(c *C) { c.Assert(string(root.Childs["level3"].Value), Equals, k3) // the revision of list should equal to the latest update's revision - _, revision2, err := etcdCli.Get(ctx, k11) + _, revision6, err := etcdCli.Get(ctx, k11) c.Assert(err, IsNil) - c.Assert(revision1, Equals, revision2) + c.Assert(revision5, Equals, revision6) } func (t *testEtcdSuite) TestDelete(c *C) { key := "binlogdelete/testkey" keys := []string{key + "/level1", key + "/level2", key + "/level1" + "/level1"} for _, k := range keys { - err := etcdCli.Create(ctx, k, k, nil) + _, err := etcdCli.Create(ctx, k, k, nil) c.Assert(err, IsNil) } diff --git a/tidb-binlog/node/registry.go b/tidb-binlog/node/registry.go index c5a4fde9d..12a78ea9c 100644 --- a/tidb-binlog/node/registry.go +++ b/tidb-binlog/node/registry.go @@ -129,7 +129,7 @@ func (r *EtcdRegistry) createNode(ctx context.Context, prefix string, status *St return errors.Annotatef(err, "error marshal NodeStatus(%v)", status) } key := r.prefixed(prefix, status.NodeID) - err = r.client.Create(ctx, key, string(objstr), nil) + _, err = r.client.Create(ctx, key, string(objstr), nil) return errors.Trace(err) } From f78576b16ac91bb7569a9aa097b65b44752c7c3b Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 18 Nov 2019 17:55:20 +0800 Subject: [PATCH 2/2] address comment --- pkg/etcd/etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 63c1f9572..28855a836 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -90,7 +90,7 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie } // impossible to happen - return -1, nil + return 0, errors.New("revision is unknown") } // Get returns a key/value matchs the given key