Skip to content

Commit

Permalink
Merge pull request #5695 from influxdata/jw-cluster
Browse files Browse the repository at this point in the history
Remove MetaServers from node.json
  • Loading branch information
jwilder committed Feb 16, 2016
2 parents 8e10099 + 6fb00c1 commit 9d478fa
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [#5557](https://github.com/influxdata/influxdb/issues/5630): Fixes panic when surrounding the select statement arguments in brackets
- [#5628](https://github.com/influxdata/influxdb/issues/5628): Crashed the server with a bad derivative query
- [#5532](https://github.com/influxdata/influxdb/issues/5532): user passwords not changeable in cluster
- [#5695](https://github.com/influxdata/influxdb/pull/5695): Remove meta servers from node.json

## v0.10.0 [2016-02-04]

Expand Down
84 changes: 22 additions & 62 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package run

import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"reflect"
"runtime"
"runtime/pprof"
"strings"
"time"

"github.com/influxdata/influxdb"
Expand Down Expand Up @@ -123,11 +124,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
}
}

nodeAddr, err := meta.DefaultHost(DefaultHostname, c.Meta.HTTPBindAddress)
if err != nil {
return nil, err
}

// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
return nil, fmt.Errorf("mkdir all: %s", err)
Expand All @@ -136,21 +132,23 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// 0.11 we no longer use peers.json. Remove the file if we have one on disk.
os.RemoveAll(filepath.Join(c.Meta.Dir, "peers.json"))

// load the node information
metaAddresses := []string{nodeAddr}
if !c.Meta.Enabled {
metaAddresses = c.Meta.JoinPeers
}

node, err := influxdb.LoadNode(c.Meta.Dir, metaAddresses)
node, err := influxdb.LoadNode(c.Meta.Dir)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
} else {
node = influxdb.NewNode(c.Meta.Dir, metaAddresses)
node = influxdb.NewNode(c.Meta.Dir)
}
}

// In 0.11 we removed MetaServers from node.json. To avoid confusion for
// existing users, force a re-save of the node.json file to remove that property
// if it happens to exist.
nodeContents, err := ioutil.ReadFile(filepath.Join(c.Meta.Dir, "node.json"))
if err == nil && strings.Contains(string(nodeContents), "MetaServers") {
node.Save()
}

// In 0.10.0 bind-address got moved to the top level. Check
// The old location to keep things backwards compatible
bind := c.BindAddress
Expand Down Expand Up @@ -626,20 +624,6 @@ func (s *Server) monitorErrorChan(ch <-chan error) {

// initializeMetaClient will set the MetaClient and join the node to the cluster if needed
func (s *Server) initializeMetaClient() error {
// if the node ID is > 0 then we just need to initialize the metaclient
if s.Node.ID > 0 {
s.MetaClient = meta.NewClient(s.Node.MetaServers, s.metaUseTLS)
if err := s.MetaClient.Open(); err != nil {
return err
}

go s.updateMetaNodeInformation()

s.MetaClient.WaitForDataChanged()

return nil
}

// It's the first time starting up and we need to either join
// the cluster or initialize this node as the first member
if len(s.joinPeers) == 0 {
Expand All @@ -655,6 +639,13 @@ func (s *Server) initializeMetaClient() error {
if err := s.MetaClient.Open(); err != nil {
return err
}

// if the node ID is > 0 then we just need to initialize the metaclient
if s.Node.ID > 0 {
s.MetaClient.WaitForDataChanged()
return nil
}

n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
for err != nil {
log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
Expand All @@ -663,47 +654,16 @@ func (s *Server) initializeMetaClient() error {
}
s.Node.ID = n.ID

metaNodes, err := s.MetaClient.MetaNodes()
if err != nil {
return err
}
for _, n := range metaNodes {
s.Node.AddMetaServers([]string{n.Host})
}

if err := s.Node.Save(); err != nil {
return err
}

go s.updateMetaNodeInformation()

return nil
}

// updateMetaNodeInformation will continuously run and save the node.json file
// if the list of metaservers in the cluster changes
func (s *Server) updateMetaNodeInformation() {
for {
c := s.MetaClient.WaitForDataChanged()
select {
case <-c:
nodes, _ := s.MetaClient.MetaNodes()
var nodeAddrs []string
for _, n := range nodes {
nodeAddrs = append(nodeAddrs, n.Host)
}
if !reflect.DeepEqual(nodeAddrs, s.Node.MetaServers) {
s.Node.MetaServers = nodeAddrs
if err := s.Node.Save(); err != nil {
log.Printf("error saving node information: %s\n", err.Error())
} else {
log.Printf("updated node metaservers with: %v\n", s.Node.MetaServers)
}
}
case <-s.closing:
return
}
}
// MetaServers returns the meta node HTTP addresses used by this server.
func (s *Server) MetaServers() []string {
return s.MetaClient.MetaServers()
}

// Service represents a service attached to the server.
Expand Down
6 changes: 3 additions & 3 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ type Cluster struct {
func NewCluster(size int) (*Cluster, error) {
c := Cluster{}
c.Servers = append(c.Servers, OpenServer(NewConfig(), ""))
metaServiceAddr := c.Servers[0].Node.MetaServers[0]
metaServiceAddr := c.Servers[0].MetaServers()[0]

for i := 1; i < size; i++ {
c.Servers = append(c.Servers, OpenServer(NewConfig(), metaServiceAddr))
Expand Down Expand Up @@ -548,7 +548,7 @@ func verifyCluster(c *Cluster, size int) error {
// grab only the meta nodes series
series := cl.Results[0].Series[0]
for i, value := range series.Values {
addr := c.Servers[i].Node.MetaServers[i]
addr := c.Servers[0].MetaServers()[0]
if value[0].(float64) != float64(i+1) {
return fmt.Errorf("expected nodeID %d, got %v", i, value[0])
}
Expand Down Expand Up @@ -594,7 +594,7 @@ func NewClusterCustom(size int, cb func(index int, config *run.Config)) (*Cluste
cb(0, config)

c.Servers = append(c.Servers, OpenServer(config, ""))
metaServiceAddr := c.Servers[0].Node.MetaServers[0]
metaServiceAddr := c.Servers[0].MetaServers()[0]

for i := 1; i < size; i++ {
config := NewConfig()
Expand Down
38 changes: 8 additions & 30 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ const (
)

type Node struct {
path string
ID uint64
MetaServers []string
path string
ID uint64
}

// LoadNode will load the node information from disk if present
func LoadNode(path string, addrs []string) (*Node, error) {
func LoadNode(path string) (*Node, error) {
// Always check to see if we are upgrading first
if err := upgradeNodeFile(path, addrs); err != nil {
if err := upgradeNodeFile(path); err != nil {
return nil, err
}

Expand All @@ -46,10 +45,9 @@ func LoadNode(path string, addrs []string) (*Node, error) {
}

// NewNode will return a new node
func NewNode(path string, addrs []string) *Node {
func NewNode(path string) *Node {
return &Node{
path: path,
MetaServers: addrs,
path: path,
}
}

Expand All @@ -75,26 +73,7 @@ func (n *Node) Save() error {
return os.Rename(tmpFile, file)
}

// AddMetaServers adds the addrs to the set of MetaServers known to this node.
// If an addr already exists, it will not be re-added.
func (n *Node) AddMetaServers(addrs []string) {
unique := map[string]struct{}{}
for _, addr := range n.MetaServers {
unique[addr] = struct{}{}
}

for _, addr := range addrs {
unique[addr] = struct{}{}
}

metaServers := []string{}
for addr := range unique {
metaServers = append(metaServers, addr)
}
n.MetaServers = metaServers
}

func upgradeNodeFile(path string, addrs []string) error {
func upgradeNodeFile(path string) error {
oldFile := filepath.Join(path, oldNodeFile)
b, err := ioutil.ReadFile(oldFile)
if err != nil {
Expand Down Expand Up @@ -122,8 +101,7 @@ func upgradeNodeFile(path string, addrs []string) error {
}

n := &Node{
path: path,
MetaServers: addrs,
path: path,
}
if n.ID, err = strconv.ParseUint(string(b), 10, 64); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,10 @@ func (c *Client) updateAuthCache() {
c.authCache = newCache
}

func (c *Client) MetaServers() []string {
return c.metaServers
}

type Peers []string

func (peers Peers) Append(p ...string) Peers {
Expand Down
2 changes: 1 addition & 1 deletion services/meta/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newHandler(c *Config, s *Service) *handler {
s: s,
config: c,
logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags),
loggingEnabled: c.LoggingEnabled,
loggingEnabled: c.ClusterTracing,
closing: make(chan struct{}),
leases: NewLeases(time.Duration(c.LeaseDuration)),
}
Expand Down

0 comments on commit 9d478fa

Please sign in to comment.