diff --git a/store/etcd/etcd.go b/store/etcd/etcd.go index d27b13a7..ca1ec5c8 100644 --- a/store/etcd/etcd.go +++ b/store/etcd/etcd.go @@ -3,12 +3,15 @@ package etcd import ( "crypto/tls" "errors" + "log" "net" "net/http" "strings" "time" - etcd "github.com/coreos/go-etcd/etcd" + "golang.org/x/net/context" + + etcd "github.com/coreos/etcd/client" "github.com/docker/libkv" "github.com/docker/libkv/store" ) @@ -23,21 +26,21 @@ var ( // Etcd is the receiver type for the // Store interface type Etcd struct { - client *etcd.Client + client etcd.KeysAPI } type etcdLock struct { - client *etcd.Client + client etcd.KeysAPI stopLock chan struct{} stopRenew chan struct{} key string value string last *etcd.Response - ttl uint64 + ttl time.Duration } const ( - periodicSync = 10 * time.Minute + periodicSync = 5 * time.Minute defaultLockTTL = 20 * time.Second defaultUpdateTime = 5 * time.Second ) @@ -57,34 +60,36 @@ func New(addrs []string, options *store.Config) (store.Store, error) { err error ) - // Create the etcd client - if options != nil && options.ClientTLS != nil { - entries = store.CreateEndpoints(addrs, "https") - s.client, err = etcd.NewTLSClient(entries, options.ClientTLS.CertFile, options.ClientTLS.KeyFile, options.ClientTLS.CACertFile) - if err != nil { - return nil, err - } - } else { - entries = store.CreateEndpoints(addrs, "http") - s.client = etcd.NewClient(entries) + entries = store.CreateEndpoints(addrs, "http") + cfg := &etcd.Config{ + Endpoints: entries, + Transport: etcd.DefaultTransport, + HeaderTimeoutPerRequest: 3 * time.Second, } // Set options if options != nil { - // Plain TLS config overrides ClientTLS if specified if options.TLS != nil { - s.setTLS(options.TLS, addrs) + setTLS(cfg, options.TLS, addrs) } if options.ConnectionTimeout != 0 { - s.setTimeout(options.ConnectionTimeout) + setTimeout(cfg, options.ConnectionTimeout) } } - // Periodic SyncCluster + c, err := etcd.New(*cfg) + if err != nil { + log.Fatal(err) + } + + s.client = etcd.NewKeysAPI(c) + + // Periodic Cluster Sync go func() { for { - s.client.SyncCluster() - time.Sleep(periodicSync) + if err := c.AutoSync(context.Background(), periodicSync); err != nil { + return + } } }() @@ -92,9 +97,9 @@ func New(addrs []string, options *store.Config) (store.Store, error) { } // SetTLS sets the tls configuration given a tls.Config scheme -func (s *Etcd) setTLS(tls *tls.Config, addrs []string) { +func setTLS(cfg *etcd.Config, tls *tls.Config, addrs []string) { entries := store.CreateEndpoints(addrs, "https") - s.client.SetCluster(entries) + cfg.Endpoints = entries // Set transport t := http.Transport{ @@ -105,36 +110,46 @@ func (s *Etcd) setTLS(tls *tls.Config, addrs []string) { TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: tls, } - s.client.SetTransport(&t) + + cfg.Transport = &t } // setTimeout sets the timeout used for connecting to the store -func (s *Etcd) setTimeout(time time.Duration) { - s.client.SetDialTimeout(time) +func setTimeout(cfg *etcd.Config, time time.Duration) { + cfg.HeaderTimeoutPerRequest = time +} + +// Normalize the key for usage in Etcd +func (s *Etcd) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimPrefix(key, "/") } -// createDirectory creates the entire path for a directory -// that does not exist -func (s *Etcd) createDirectory(path string) error { - if _, err := s.client.CreateDir(store.Normalize(path), 10); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Skip key already exists - if etcdError.ErrorCode != 105 { - return err +// keyNotFound checks on the error returned by the KeysAPI +// to verify if the key exists in the store or not +func keyNotFound(err error) bool { + if err != nil { + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code == etcd.ErrorCodeKeyNotFound || + etcdError.Code == etcd.ErrorCodeNotFile || + etcdError.Code == etcd.ErrorCodeNotDir { + return true } - } else { - return err } } - return nil + return false } -// Get the value at "key", returns the last modified index -// to use in conjunction to Atomic calls +// Get the value at "key", returns the last modified +// index to use in conjunction to Atomic calls func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { - result, err := s.client.Get(store.Normalize(key), false, false) + getOpts := &etcd.GetOptions{ + Quorum: true, + } + + result, err := s.client.Get(context.Background(), s.normalize(key), getOpts) if err != nil { - if isKeyNotFoundError(err) { + if keyNotFound(err) { return nil, store.ErrKeyNotFound } return nil, err @@ -151,40 +166,26 @@ func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { // Put a value at "key" func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { + setOpts := &etcd.SetOptions{} - // Default TTL = 0 means no expiration - var ttl uint64 - if opts != nil && opts.TTL > 0 { - ttl = uint64(opts.TTL.Seconds()) + // Set options + if opts != nil { + setOpts.Dir = opts.IsDir + setOpts.TTL = opts.TTL } - if _, err := s.client.Set(key, string(value), ttl); err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - - // Not a directory - if etcdError.ErrorCode == 104 { - // Remove the last element (the actual key) - // and create the full directory path - err = s.createDirectory(store.GetDirectory(key)) - if err != nil { - return err - } - - // Now that the directory is created, set the key - if _, err := s.client.Set(key, string(value), ttl); err != nil { - return err - } - } - } - return err - } - return nil + _, err := s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) + return err } // Delete a value at "key" func (s *Etcd) Delete(key string) error { - _, err := s.client.Delete(store.Normalize(key), false) - if isKeyNotFoundError(err) { + opts := &etcd.DeleteOptions{ + Recursive: false, + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), opts) + if keyNotFound(err) { return store.ErrKeyNotFound } return err @@ -208,130 +209,130 @@ func (s *Etcd) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Start an etcd watch. - // Note: etcd will send the current value through the channel. - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(store.Normalize(key), 0, false, etcdWatchCh, etcdStopCh) - - // Adapter goroutine: The goal here is to convert whatever - // format etcd is using into our interface. + opts := &etcd.WatcherOptions{Recursive: false} + watcher := s.client.Watcher(s.normalize(key), opts) + + // watchCh is sending back events to the caller watchCh := make(chan *store.KVPair) + go func() { defer close(watchCh) // Get the current value - current, err := s.Get(key) + pair, err := s.Get(key) if err != nil { return } // Push the current value through the channel. - watchCh <- current + watchCh <- pair for { + // Check if the watch was stopped by the caller select { - case result := <-etcdWatchCh: - if result == nil || result.Node == nil { - // Something went wrong, exit - // No need to stop the chan as the watch already ended - return - } - watchCh <- &store.KVPair{ - Key: key, - Value: []byte(result.Node.Value), - LastIndex: result.Node.ModifiedIndex, - } case <-stopCh: - etcdStopCh <- true return + default: + } + + result, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + watchCh <- &store.KVPair{ + Key: key, + Value: []byte(result.Node.Value), + LastIndex: result.Node.ModifiedIndex, } } }() + return watchCh, nil } // WatchTree watches for changes on a "directory" // It returns a channel that will receive changes or pass // on errors. Upon creating a watch, the current childs values -// will be sent to the channel .Providing a non-nil stopCh can +// will be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // Start the watch - etcdWatchCh := make(chan *etcd.Response) - etcdStopCh := make(chan bool) - go s.client.Watch(store.Normalize(directory), 0, true, etcdWatchCh, etcdStopCh) + watchOpts := &etcd.WatcherOptions{Recursive: true} + watcher := s.client.Watcher(s.normalize(directory), watchOpts) - // Adapter goroutine: The goal here is to convert whatever - // format etcd is using into our interface. + // watchCh is sending back events to the caller watchCh := make(chan []*store.KVPair) + go func() { defer close(watchCh) // Get child values - current, err := s.List(directory) + list, err := s.List(directory) if err != nil { return } // Push the current value through the channel. - watchCh <- current + watchCh <- list for { + // Check if the watch was stopped by the caller select { - case event := <-etcdWatchCh: - if event == nil { - // Something went wrong, exit - // No need to stop the chan as the watch already ended - return - } - // FIXME: We should probably use the value pushed by the channel. - // However, Node.Nodes seems to be empty. - if list, err := s.List(directory); err == nil { - watchCh <- list - } case <-stopCh: - etcdStopCh <- true return + default: } + + _, err := watcher.Next(context.Background()) + + if err != nil { + return + } + + list, err = s.List(directory) + if err != nil { + return + } + + watchCh <- list } }() + return watchCh, nil } -// AtomicPut put a value at "key" if the key has not been +// AtomicPut puts a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { +func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) { + var ( + meta *etcd.Response + err error + ) + + setOpts := &etcd.SetOptions{} - var meta *etcd.Response - var err error if previous != nil { - meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + setOpts.PrevExist = etcd.PrevExist + setOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + setOpts.PrevValue = string(previous.Value) + } } else { - // Interpret previous == nil as Atomic Create - meta, err = s.client.Create(store.Normalize(key), string(value), 0) - if etcdError, ok := err.(*etcd.EtcdError); ok { - - // Directory doesn't exist. - if etcdError.ErrorCode == 104 { - // Remove the last element (the actual key) - // and create the full directory path - err = s.createDirectory(store.GetDirectory(key)) - if err != nil { - return false, nil, err - } + setOpts.PrevExist = etcd.PrevNoExist + } - // Now that the directory is created, create the key - if _, err := s.client.Create(key, string(value), 0); err != nil { - return false, nil, err - } - } + if opts != nil { + if opts.TTL > 0 { + setOpts.TTL = opts.TTL } } + + meta, err = s.client.Set(context.Background(), s.normalize(key), string(value), setOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Compare Failed - if etcdError.ErrorCode == 101 { + if etcdError, ok := err.(etcd.Error); ok { + // Compare failed + if etcdError.Code == etcd.ErrorCodeTestFailed { return false, nil, store.ErrKeyModified } } @@ -355,11 +356,20 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { return false, store.ErrPreviousNotSpecified } - _, err := s.client.CompareAndDelete(store.Normalize(key), "", previous.LastIndex) + delOpts := &etcd.DeleteOptions{} + + if previous != nil { + delOpts.PrevIndex = previous.LastIndex + if previous.Value != nil { + delOpts.PrevValue = string(previous.Value) + } + } + + _, err := s.client.Delete(context.Background(), s.normalize(key), delOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError, ok := err.(etcd.Error); ok { // Compare failed - if etcdError.ErrorCode == 101 { + if etcdError.Code == etcd.ErrorCodeTestFailed { return false, store.ErrKeyModified } } @@ -371,18 +381,24 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { // List child nodes of a given directory func (s *Etcd) List(directory string) ([]*store.KVPair, error) { - resp, err := s.client.Get(store.Normalize(directory), true, true) + getOpts := &etcd.GetOptions{ + Quorum: true, + Recursive: true, + Sort: true, + } + + resp, err := s.client.Get(context.Background(), s.normalize(directory), getOpts) if err != nil { - if isKeyNotFoundError(err) { + if keyNotFound(err) { return nil, store.ErrKeyNotFound } return nil, err } + kv := []*store.KVPair{} for _, n := range resp.Node.Nodes { - key := strings.TrimLeft(n.Key, "/") kv = append(kv, &store.KVPair{ - Key: key, + Key: n.Key, Value: []byte(n.Value), LastIndex: n.ModifiedIndex, }) @@ -392,8 +408,12 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Etcd) DeleteTree(directory string) error { - _, err := s.client.Delete(store.Normalize(directory), true) - if isKeyNotFoundError(err) { + delOpts := &etcd.DeleteOptions{ + Recursive: true, + } + + _, err := s.client.Delete(context.Background(), s.normalize(directory), delOpts) + if keyNotFound(err) { return store.ErrKeyNotFound } return err @@ -403,7 +423,7 @@ func (s *Etcd) DeleteTree(directory string) error { // be used to provide mutual exclusion on a key func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { var value string - ttl := uint64(time.Duration(defaultLockTTL).Seconds()) + ttl := defaultLockTTL renewCh := make(chan struct{}) // Apply options on Lock @@ -412,7 +432,7 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke value = string(options.Value) } if options.TTL != 0 { - ttl = uint64(options.TTL.Seconds()) + ttl = options.TTL } if options.RenewLock != nil { renewCh = options.RenewLock @@ -423,7 +443,7 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke lock = &etcdLock{ client: s.client, stopRenew: renewCh, - key: key, + key: s.normalize(key), value: value, ttl: ttl, } @@ -436,47 +456,58 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke // lock is lost or if an error occurs func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { - key := store.Normalize(l.key) - // Lock holder channel lockHeld := make(chan struct{}) stopLocking := l.stopRenew - var lastIndex uint64 + setOpts := &etcd.SetOptions{ + TTL: l.ttl, + } for { - resp, err := l.client.Create(key, l.value, l.ttl) + setOpts.PrevExist = etcd.PrevNoExist + resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Key already exists - if etcdError.ErrorCode != 105 { - lastIndex = ^uint64(0) + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeNodeExist { + return nil, err } + setOpts.PrevIndex = ^uint64(0) } } else { - lastIndex = resp.Node.ModifiedIndex + setOpts.PrevIndex = resp.Node.ModifiedIndex } - l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + setOpts.PrevExist = etcd.PrevExist + l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts) if err == nil { // Leader section l.stopLock = stopLocking - go l.holdLock(key, lockHeld, stopLocking) + go l.holdLock(l.key, lockHeld, stopLocking) break } else { + // If this is a legitimate error, return + if etcdError, ok := err.(etcd.Error); ok { + if etcdError.Code != etcd.ErrorCodeTestFailed { + return nil, err + } + } + // Seeker section - chW := make(chan *etcd.Response) + errorCh := make(chan error) chWStop := make(chan bool) free := make(chan bool) - go l.waitLock(key, chW, chWStop, free) + go l.waitLock(l.key, errorCh, chWStop, free) // Wait for the key to be available or for // a signal to stop trying to lock the key select { case _ = <-free: break + case err := <-errorCh: + return nil, err case _ = <-stopChan: return nil, ErrAbortTryLock } @@ -495,15 +526,17 @@ func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) { defer close(lockHeld) - update := time.NewTicker(time.Duration(l.ttl) * time.Second / 3) + update := time.NewTicker(l.ttl / 3) defer update.Stop() var err error + setOpts := &etcd.SetOptions{TTL: l.ttl} for { select { case <-update.C: - l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex) + setOpts.PrevIndex = l.last.Node.ModifiedIndex + l.last, err = l.client.Set(context.Background(), key, l.value, setOpts) if err != nil { return } @@ -515,12 +548,19 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-ch } // WaitLock simply waits for the key to be available for creation -func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool, free chan<- bool) { - go l.client.Watch(key, 0, false, eventCh, stopWatchCh) +func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) { + opts := &etcd.WatcherOptions{Recursive: false} + watcher := l.client.Watcher(key, opts) - for event := range eventCh { + for { + event, err := watcher.Next(context.Background()) + if err != nil { + errorCh <- err + return + } if event.Action == "delete" || event.Action == "expire" { free <- true + return } } } @@ -532,7 +572,10 @@ func (l *etcdLock) Unlock() error { l.stopLock <- struct{}{} } if l.last != nil { - _, err := l.client.CompareAndDelete(store.Normalize(l.key), l.value, l.last.Node.ModifiedIndex) + delOpts := &etcd.DeleteOptions{ + PrevIndex: l.last.Node.ModifiedIndex, + } + _, err := l.client.Delete(context.Background(), l.key, delOpts) if err != nil { return err } @@ -544,15 +587,3 @@ func (l *etcdLock) Unlock() error { func (s *Etcd) Close() { return } - -func isKeyNotFoundError(err error) bool { - if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return true - } - } - } - return false -} diff --git a/store/store.go b/store/store.go index 26b9af75..1efeb8e4 100644 --- a/store/store.go +++ b/store/store.go @@ -46,7 +46,7 @@ type Config struct { } // ClientTLSConfig contains data for a Client TLS configuration in the form -// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. +// the etcd client wants it. Eventually we'll adapt it for ZK and Consul. type ClientTLSConfig struct { CertFile string KeyFile string @@ -108,7 +108,8 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { - TTL time.Duration + IsDir bool + TTL time.Duration } // LockOptions contains optional request parameters diff --git a/testutils/utils.go b/testutils/utils.go index 2c71d7c4..d1b6408d 100644 --- a/testutils/utils.go +++ b/testutils/utils.go @@ -51,7 +51,7 @@ func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { func testPutGetDeleteExists(t *testing.T, kv store.Store) { // Get a not exist key should return ErrKeyNotFound - pair, err := kv.Get("/testPutGetDelete_not_exist_key") + pair, err := kv.Get("testPutGetDelete_not_exist_key") assert.Equal(t, store.ErrKeyNotFound, err) value := []byte("bar") @@ -62,6 +62,7 @@ func testPutGetDeleteExists(t *testing.T, kv store.Store) { "testPutGetDeleteExists/testbar/testfoobar", } { failMsg := fmt.Sprintf("Fail key %s", key) + // Put the key err = kv.Put(key, value, nil) assert.NoError(t, err, failMsg) @@ -179,7 +180,7 @@ func testWatchTree(t *testing.T, kv store.Store) { // Update loop go func() { - timeout := time.After(250 * time.Millisecond) + timeout := time.After(500 * time.Millisecond) for { select { case <-timeout: @@ -191,15 +192,17 @@ func testWatchTree(t *testing.T, kv store.Store) { }() // Check for updates + eventCount := 1 for { select { case event := <-events: assert.NotNil(t, event) // We received the Delete event on a child node // Exit test successfully - if len(event) == 2 { + if eventCount == 2 { return } + eventCount++ case <-time.After(4 * time.Second): t.Fatal("Timeout reached") return @@ -235,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.True(t, success) // This CAS should fail, key exists. - pair.LastIndex = 0 + pair.LastIndex = 6744 success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) @@ -291,7 +294,7 @@ func testAtomicDelete(t *testing.T, kv store.Store) { tempIndex := pair.LastIndex // AtomicDelete should fail - pair.LastIndex = 0 + pair.LastIndex = 6744 success, err := kv.AtomicDelete(key, pair) assert.Error(t, err) assert.False(t, success)