From d635a8e32b67f1ac511b7fcc15d524dedb9f549b Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Mon, 5 Oct 2015 05:57:23 -0700 Subject: [PATCH] Migration to the new Etcd client This commit migrates the old 'go-etcd' client, which is deprecated to the new 'coreos/etcd/client'. One notable change is the ability to specify an 'IsDir' parameter to the 'Put' call. This allows to circumvent the limitations of etcd regarding the key/directory distinction while setting up Watches on a directory. A conservative measure to set up a watch that should be used the same way for etcd/consul/zookeeper is to enforce the 'IsDir' parameter with 'WriteOptions' on 'Put' to avoid the 'NotANode' error thrown by etcd on Watch call. Consul and zookeeper are not using the 'IsDir' parameter. Signed-off-by: Alexandre Beslic --- store/etcd/etcd.go | 389 ++++++++++++++++++++++++--------------------- store/store.go | 5 +- testutils/utils.go | 13 +- 3 files changed, 221 insertions(+), 186 deletions(-) diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index d27b13a7..ca1ec5c8 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -3,12 +3,15 @@ package etcd import ( "crypto/tls" "errors" + "log" "net" "net/http" "strings" "time" - etcd "github.com/coreos/go-etcd/etcd" + "golang.org/x/net/context" + + etcd "github.com/coreos/etcd/client" "github.com/docker/libkv" "github.com/docker/libkv/store" ) @@ -23,21 +26,21 @@ var ( // Etcd is the receiver type for the // Store interface type Etcd struct { - client *etcd.Client + client etcd.KeysAPI } type etcdLock struct { - client *etcd.Client + client etcd.KeysAPI stopLock chan struct{} stopRenew chan struct{} key string value string last *etcd.Response - ttl uint64 + ttl time.Duration } const ( - periodicSync = 10 * time.Minute + periodicSync = 5 * time.Minute defaultLockTTL = 20 * time.Second defaultUpdateTime = 5 * time.Second ) @@ -57,34 +60,36 @@ func New(addrs []string, options *store.Config) (store.Store, error) { err error ) - // Create the etcd client - if options != nil && options.ClientTLS != nil { - entries = store.CreateEndpoints(addrs, "https") - s.client, err = etcd.NewTLSClient(entries, options.ClientTLS.CertFile, options.ClientTLS.KeyFile, options.ClientTLS.CACertFile) - if err != nil { - return nil, err - } - } else { - entries = store.CreateEndpoints(addrs, "http") - s.client = etcd.NewClient(entries) + entries = store.CreateEndpoints(addrs, "http") + cfg := &etcd.Config{ + Endpoints: entries, + Transport: etcd.DefaultTransport, + HeaderTimeoutPerRequest: 3 * time.Second, } // Set options if options != nil { - // Plain TLS config overrides ClientTLS if specified if options.TLS != nil { - s.setTLS(options.TLS, addrs) + setTLS(cfg, options.TLS, addrs) } if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) + setTimeout(cfg, options.ConnectionTimeout) } } - // Periodic SyncCluster + c, err := etcd.New(*cfg) + if err != nil { + log.Fatal(err) + } + + s.client = etcd.NewKeysAPI(c) + + // Periodic Cluster Sync go func() { for { - s.client.SyncCluster() - time.Sleep(periodicSync) + if err := c.AutoSync(context.Background(), periodicSync); err != nil { + return + } } }() @@ -92,9 +97,9 @@ func New(addrs []string, options *store.Config) (store.Store, error) { } // SetTLS sets the tls configuration given a tls.Config scheme -func (s *Etcd) setTLS(tls *tls.Config, addrs []string) { +func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) { entries := store.CreateEndpoints(addrs, "https") - s.client.SetCluster(entries) + cfg.Endpoints = entries // Set transport t := http.Transport{ @@ -105,36 +110,46 @@ func (s *Etcd) setTLS(tls *tls.Config, addrs []string) { TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: tls, } - s.client.SetTransport(&t) + + cfg.Transport = &t } // setTimeout sets the timeout used for connecting to the store -func (s *Etcd) setTimeout(time time.Duration) { - s.client.SetDialTimeout(time) +func setTimeout(cfg *etcd.Config, time time.Duration) { + cfg.HeaderTimeoutPerRequest = time +} + +// Normalize the key for usage in Etcd +func (s *Etcd) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimPrefix(key, "/") } -// createDirectory creates the entire path for a directory -// that does not exist -func (s *Etcd) createDirectory(path string) error { - if _, err := s.client.CreateDir(store.Normalize(path), 10); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Skip key already exists - if etcdError.ErrorCode != 105 { - return err +// keyNotFound checks on the error returned by the KeysAPI +// to verify if the key exists in the store or not +func keyNotFound(err error) bool { + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code == etcd.ErrorCodeKeyNotFound || + etcdError.Code == etcd.ErrorCodeNotFile || + etcdError.Code == etcd.ErrorCodeNotDir { + return true } - } else { - return err } } - return nil + return false } -// Get the value at "key", returns the last modified index -// to use in conjunction to Atomic calls +// Get the value at "key", returns the last modified +// index to use in conjunction to Atomic calls func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { - result, err := s.client.Get(store.Normalize(key), false, false) + getOpts := &etcd.GetOptions{ + Quorum: true, + } + + result, err := s.client.Get(context.Background(), s.normalize(key), getOpts) if err != nil { - if isKeyNotFoundError(err) { + if keyNotFound(err) { return nil, store.ErrKeyNotFound } return nil, err @@ -151,40 +166,26 @@ func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { // Put a value at "key" func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { + setOpts := &etcd.SetOptions{} - // Default TTL = 0 means no expiration - var ttl uint64 - if opts != nil && opts.TTL > 0 { - ttl = uint64(opts.TTL.Seconds()) + // Set options + if opts != nil { + setOpts.Dir = opts.IsDir + setOpts.TTL = opts.TTL } - if _, err := s.client.Set(key, string(value), ttl); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - - // Not a directory - if etcdError.ErrorCode == 104 { - // Remove the last element (the actual key) - // and create the full directory path - err = s.createDirectory(store.GetDirectory(key)) - if err != nil { - return err - } - - // Now that the directory is created, set the key - if _, err := s.client.Set(key, string(value), ttl); err != nil { - return err - } - } - } - return err - } - return nil + _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) + return err } // Delete a value at "key" func (s *Etcd) Delete(key string) error { - _, err := s.client.Delete(store.Normalize(key), false) - if isKeyNotFoundError(err) { + opts := &etcd.DeleteOptions{ + Recursive: false, + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), opts) + if keyNotFound(err) { return store.ErrKeyNotFound } return err @@ -208,130 +209,130 @@ func (s *Etcd) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Start an etcd watch. - // Note: etcd will send the current value through the channel. - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(store.Normalize(key), 0, false, etcdWatchCh, etcdStopCh) - - // Adapter goroutine: The goal here is to convert whatever - // format etcd is using into our interface. + opts := &etcd.WatcherOptions{Recursive: false} + watcher := s.client.Watcher(s.normalize(key), opts) + + // watchCh is sending back events to the caller watchCh := make(chan *store.KVPair) + go func() { defer close(watchCh) // Get the current value - current, err := s.Get(key) + pair, err := s.Get(key) if err != nil { return } // Push the current value through the channel. - watchCh <- current + watchCh <- pair for { + // Check if the watch was stopped by the caller select { - case result := <-etcdWatchCh: - if result == nil || result.Node == nil { - // Something went wrong, exit - // No need to stop the chan as the watch already ended - return - } - watchCh <- &store.KVPair{ - Key: key, - Value: []byte(result.Node.Value), - LastIndex: result.Node.ModifiedIndex, - } case <-stopCh: - etcdStopCh <- true return + default: + } + + result, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + watchCh <- &store.KVPair{ + Key: key, + Value: []byte(result.Node.Value), + LastIndex: result.Node.ModifiedIndex, } } }() + return watchCh, nil } // WatchTree watches for changes on a "directory" // It returns a channel that will receive changes or pass // on errors. Upon creating a watch, the current childs values -// will be sent to the channel .Providing a non-nil stopCh can +// will be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // Start the watch - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(store.Normalize(directory), 0, true, etcdWatchCh, etcdStopCh) + watchOpts := &etcd.WatcherOptions{Recursive: true} + watcher := s.client.Watcher(s.normalize(directory), watchOpts) - // Adapter goroutine: The goal here is to convert whatever - // format etcd is using into our interface. + // watchCh is sending back events to the caller watchCh := make(chan []*store.KVPair) + go func() { defer close(watchCh) // Get child values - current, err := s.List(directory) + list, err := s.List(directory) if err != nil { return } // Push the current value through the channel. - watchCh <- current + watchCh <- list for { + // Check if the watch was stopped by the caller select { - case event := <-etcdWatchCh: - if event == nil { - // Something went wrong, exit - // No need to stop the chan as the watch already ended - return - } - // FIXME: We should probably use the value pushed by the channel. - // However, Node.Nodes seems to be empty. - if list, err := s.List(directory); err == nil { - watchCh <- list - } case <-stopCh: - etcdStopCh <- true return + default: } + + _, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + list, err = s.List(directory) + if err != nil { + return + } + + watchCh <- list } }() + return watchCh, nil } -// AtomicPut put a value at "key" if the key has not been +// AtomicPut puts a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { +func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { + var ( + meta *etcd.Response + err error + ) + + setOpts := &etcd.SetOptions{} - var meta *etcd.Response - var err error if previous != nil { - meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + setOpts.PrevExist = etcd.PrevExist + setOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + setOpts.PrevValue = string(previous.Value) + } } else { - // Interpret previous == nil as Atomic Create - meta, err = s.client.Create(store.Normalize(key), string(value), 0) - if etcdError, ok := err.(*etcd.EtcdError); ok { - - // Directory doesn't exist. - if etcdError.ErrorCode == 104 { - // Remove the last element (the actual key) - // and create the full directory path - err = s.createDirectory(store.GetDirectory(key)) - if err != nil { - return false, nil, err - } + setOpts.PrevExist = etcd.PrevNoExist + } - // Now that the directory is created, create the key - if _, err := s.client.Create(key, string(value), 0); err != nil { - return false, nil, err - } - } + if opts != nil { + if opts.TTL > 0 { + setOpts.TTL = opts.TTL } } + + meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Compare Failed - if etcdError.ErrorCode == 101 { + if etcdError, ok := err.(etcd.Error); ok { + // Compare failed + if etcdError.Code == etcd.ErrorCodeTestFailed { return false, nil, store.ErrKeyModified } } @@ -355,11 +356,20 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { return false, store.ErrPreviousNotSpecified } - _, err := s.client.CompareAndDelete(store.Normalize(key), "", previous.LastIndex) + delOpts := &etcd.DeleteOptions{} + + if previous != nil { + delOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + delOpts.PrevValue = string(previous.Value) + } + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError, ok := err.(etcd.Error); ok { // Compare failed - if etcdError.ErrorCode == 101 { + if etcdError.Code == etcd.ErrorCodeTestFailed { return false, store.ErrKeyModified } } @@ -371,18 +381,24 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { // List child nodes of a given directory func (s *Etcd) List(directory string) ([]*store.KVPair, error) { - resp, err := s.client.Get(store.Normalize(directory), true, true) + getOpts := &etcd.GetOptions{ + Quorum: true, + Recursive: true, + Sort: true, + } + + resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts) if err != nil { - if isKeyNotFoundError(err) { + if keyNotFound(err) { return nil, store.ErrKeyNotFound } return nil, err } + kv := []*store.KVPair{} for _, n := range resp.Node.Nodes { - key := strings.TrimLeft(n.Key, "/") kv = append(kv, &store.KVPair{ - Key: key, + Key: n.Key, Value: []byte(n.Value), LastIndex: n.ModifiedIndex, }) @@ -392,8 +408,12 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Etcd) DeleteTree(directory string) error { - _, err := s.client.Delete(store.Normalize(directory), true) - if isKeyNotFoundError(err) { + delOpts := &etcd.DeleteOptions{ + Recursive: true, + } + + _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts) + if keyNotFound(err) { return store.ErrKeyNotFound } return err @@ -403,7 +423,7 @@ func (s *Etcd) DeleteTree(directory string) error { // be used to provide mutual exclusion on a key func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { var value string - ttl := uint64(time.Duration(defaultLockTTL).Seconds()) + ttl := defaultLockTTL renewCh := make(chan struct{}) // Apply options on Lock @@ -412,7 +432,7 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke value = string(options.Value) } if options.TTL != 0 { - ttl = uint64(options.TTL.Seconds()) + ttl = options.TTL } if options.RenewLock != nil { renewCh = options.RenewLock @@ -423,7 +443,7 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke lock = &etcdLock{ client: s.client, stopRenew: renewCh, - key: key, + key: s.normalize(key), value: value, ttl: ttl, } @@ -436,47 +456,58 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke // lock is lost or if an error occurs func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { - key := store.Normalize(l.key) - // Lock holder channel lockHeld := make(chan struct{}) stopLocking := l.stopRenew - var lastIndex uint64 + setOpts := &etcd.SetOptions{ + TTL: l.ttl, + } for { - resp, err := l.client.Create(key, l.value, l.ttl) + setOpts.PrevExist = etcd.PrevNoExist + resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Key already exists - if etcdError.ErrorCode != 105 { - lastIndex = ^uint64(0) + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeNodeExist { + return nil, err } + setOpts.PrevIndex = ^uint64(0) } } else { - lastIndex = resp.Node.ModifiedIndex + setOpts.PrevIndex = resp.Node.ModifiedIndex } - l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + setOpts.PrevExist = etcd.PrevExist + l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts) if err == nil { // Leader section l.stopLock = stopLocking - go l.holdLock(key, lockHeld, stopLocking) + go l.holdLock(l.key, lockHeld, stopLocking) break } else { + // If this is a legitimate error, return + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeTestFailed { + return nil, err + } + } + // Seeker section - chW := make(chan *etcd.Response) + errorCh := make(chan error) chWStop := make(chan bool) free := make(chan bool) - go l.waitLock(key, chW, chWStop, free) + go l.waitLock(l.key, errorCh, chWStop, free) // Wait for the key to be available or for // a signal to stop trying to lock the key select { case _ = <-free: break + case err := <-errorCh: + return nil, err case _ = <-stopChan: return nil, ErrAbortTryLock } @@ -495,15 +526,17 @@ func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) { defer close(lockHeld) - update := time.NewTicker(time.Duration(l.ttl) * time.Second / 3) + update := time.NewTicker(l.ttl / 3) defer update.Stop() var err error + setOpts := &etcd.SetOptions{TTL: l.ttl} for { select { case <-update.C: - l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex) + setOpts.PrevIndex = l.last.Node.ModifiedIndex + l.last, err = l.client.Set(context.Background(), key, l.value, setOpts) if err != nil { return } @@ -515,12 +548,19 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-ch } // WaitLock simply waits for the key to be available for creation -func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool, free chan<- bool) { - go l.client.Watch(key, 0, false, eventCh, stopWatchCh) +func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) { + opts := &etcd.WatcherOptions{Recursive: false} + watcher := l.client.Watcher(key, opts) - for event := range eventCh { + for { + event, err := watcher.Next(context.Background()) + if err != nil { + errorCh <- err + return + } if event.Action == "delete" || event.Action == "expire" { free <- true + return } } } @@ -532,7 +572,10 @@ func (l *etcdLock) Unlock() error { l.stopLock <- struct{}{} } if l.last != nil { - _, err := l.client.CompareAndDelete(store.Normalize(l.key), l.value, l.last.Node.ModifiedIndex) + delOpts := &etcd.DeleteOptions{ + PrevIndex: l.last.Node.ModifiedIndex, + } + _, err := l.client.Delete(context.Background(), l.key, delOpts) if err != nil { return err } @@ -544,15 +587,3 @@ func (l *etcdLock) Unlock() error { func (s *Etcd) Close() { return } - -func isKeyNotFoundError(err error) bool { - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return true - } - } - } - return false -} diff --git a/store/store.go b/store/store.go index 26b9af75..1efeb8e4 100644 --- a/store/store.go +++ b/store/store.go @@ -46,7 +46,7 @@ type Config struct { } // ClientTLSConfig contains data for a Client TLS configuration in the form -// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. +// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. type ClientTLSConfig struct { CertFile string KeyFile string @@ -108,7 +108,8 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { - TTL time.Duration + IsDir bool + TTL time.Duration } // LockOptions contains optional request parameters diff --git a/testutils/utils.go b/testutils/utils.go index 2c71d7c4..d1b6408d 100644 --- a/testutils/utils.go +++ b/testutils/utils.go @@ -51,7 +51,7 @@ func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { func testPutGetDeleteExists(t *testing.T, kv store.Store) { // Get a not exist key should return ErrKeyNotFound - pair, err := kv.Get("/testPutGetDelete_not_exist_key") + pair, err := kv.Get("testPutGetDelete_not_exist_key") assert.Equal(t, store.ErrKeyNotFound, err) value := []byte("bar") @@ -62,6 +62,7 @@ func testPutGetDeleteExists(t *testing.T, kv store.Store) { "testPutGetDeleteExists/testbar/testfoobar", } { failMsg := fmt.Sprintf("Fail key %s", key) + // Put the key err = kv.Put(key, value, nil) assert.NoError(t, err, failMsg) @@ -179,7 +180,7 @@ func testWatchTree(t *testing.T, kv store.Store) { // Update loop go func() { - timeout := time.After(250 * time.Millisecond) + timeout := time.After(500 * time.Millisecond) for { select { case <-timeout: @@ -191,15 +192,17 @@ func testWatchTree(t *testing.T, kv store.Store) { }() // Check for updates + eventCount := 1 for { select { case event := <-events: assert.NotNil(t, event) // We received the Delete event on a child node // Exit test successfully - if len(event) == 2 { + if eventCount == 2 { return } + eventCount++ case <-time.After(4 * time.Second): t.Fatal("Timeout reached") return @@ -235,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.True(t, success) // This CAS should fail, key exists. - pair.LastIndex = 0 + pair.LastIndex = 6744 success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) @@ -291,7 +294,7 @@ func testAtomicDelete(t *testing.T, kv store.Store) { tempIndex := pair.LastIndex // AtomicDelete should fail - pair.LastIndex = 0 + pair.LastIndex = 6744 success, err := kv.AtomicDelete(key, pair) assert.Error(t, err) assert.False(t, success)