Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Merge pull request #345 from dalanlan/update-etcd-client
Browse files Browse the repository at this point in the history
Use etcd client lib instead of go-etcd
  • Loading branch information
brendandburns committed Jan 7, 2016
2 parents bf643df + 2445e74 commit ccecc17
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions pod-master/podmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
"fmt"
"io/ioutil"
"os"
"strings"
"time"

etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"

"github.com/coreos/go-etcd/etcd"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"github.com/spf13/pflag"
"golang.org/x/net/context"
)

type config struct {
etcdServers string
etcdServers []string
key string
whoami string
ttl uint64
Expand Down Expand Up @@ -72,11 +72,16 @@ func (c *config) leaseAndUpdateLoop(etcdClient *etcd.Client) {
// returns true if we have the lease, and an error if one occurs.
// TODO: use the master election utility once it is merged in.
func (c *config) acquireOrRenewLease(etcdClient *etcd.Client) (bool, error) {
result, err := etcdClient.Get(c.key, false, false)
keysAPI := etcd.NewKeysAPI(*etcdClient)
resp, err := keysAPI.Get(context.TODO(), c.key, nil)
if err != nil {
if etcdstorage.IsEtcdNotFound(err) {
// there is no current master, try to become master, create will fail if the key already exists
_, err := etcdClient.Create(c.key, c.whoami, c.ttl)
opts := etcd.SetOptions{
TTL: time.Duration(c.ttl) * time.Second,
PrevExist: "",
}
_, err := keysAPI.Set(context.TODO(), c.key, c.whoami, &opts)
if err != nil {
return false, err
}
Expand All @@ -85,19 +90,24 @@ func (c *config) acquireOrRenewLease(etcdClient *etcd.Client) (bool, error) {
}
return false, err
}
if result.Node.Value == c.whoami {
glog.Infof("key already exists, we are the master (%s)", result.Node.Value)
if resp.Node.Value == c.whoami {
glog.Infof("key already exists, we are the master (%s)", resp.Node.Value)
// we extend our lease @ 1/2 of the existing TTL, this ensures the master doesn't flap around
if result.Node.Expiration.Sub(time.Now()) < time.Duration(c.ttl/2)*time.Second {
_, err := etcdClient.CompareAndSwap(c.key, c.whoami, c.ttl, c.whoami, result.Node.ModifiedIndex)
if resp.Node.Expiration.Sub(time.Now()) < time.Duration(c.ttl/2)*time.Second {
opts := etcd.SetOptions{
TTL: time.Duration(c.ttl) * time.Second,
PrevValue: c.whoami,
PrevIndex: resp.Node.ModifiedIndex,
}
_, err := keysAPI.Set(context.TODO(), c.key, c.whoami, &opts)
if err != nil {
return false, err
}
}
c.lastLease = time.Now()
return true, nil
}
glog.Infof("key already exists, the master is %s, sleeping.", result.Node.Value)
glog.Infof("key already exists, the master is %s, sleeping.", resp.Node.Value)
return false, nil
}

Expand Down Expand Up @@ -154,7 +164,7 @@ func copyFile(src, dest string) error {
}

func initFlags(c *config) {
pflag.StringVar(&c.etcdServers, "etcd-servers", "", "The comma-seprated list of etcd servers to use")
pflag.StringSliceVar(&c.etcdServers, "etcd-servers", []string{}, "The comma-seprated list of etcd servers to use")
pflag.StringVar(&c.key, "key", "", "The key to use for the lock")
pflag.StringVar(&c.whoami, "whoami", "", "The name to use for the reservation. If empty use os.Hostname")
pflag.Uint64Var(&c.ttl, "ttl-secs", 30, "The time to live for the lock.")
Expand Down Expand Up @@ -192,8 +202,13 @@ func main() {
pflag.Parse()
validateFlags(&c)

machines := strings.Split(c.etcdServers, ",")
etcdClient := etcd.NewClient(machines)
cfg := etcd.Config{
Endpoints: c.etcdServers,
}
etcdClient, err := etcd.New(cfg)
if err != nil {
glog.Fatalf("misconfigured etcd: %v", err)
}

c.leaseAndUpdateLoop(etcdClient)
c.leaseAndUpdateLoop(&etcdClient)
}

0 comments on commit ccecc17

Please sign in to comment.