Skip to content

Commit

Permalink
Add config notifications and increased test coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Mar 14, 2015
1 parent fc189cd commit 53dbec8
Show file tree
Hide file tree
Showing 11 changed files with 789 additions and 79 deletions.
12 changes: 11 additions & 1 deletion cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,20 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf
// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), clientJoinURLs); err != nil {
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err)
}

// If join URLs were passed in then use them to override the client's URLs.
if len(clientJoinURLs) > 0 {
c.SetURLs(clientJoinURLs)
}

// If no URLs exist on the client the return an error since we cannot reach a broker.
if len(c.URLs()) == 0 {
log.Fatal("messaging client has no broker URLs")
}

// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand Down
8 changes: 8 additions & 0 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type Broker struct {
// Log is the distributed raft log that commands are applied to.
Log interface {
URL() url.URL
URLs() []url.URL
Leader() (uint64, url.URL)
IsLeader() bool
ClusterID() uint64
Apply(data []byte) (index uint64, err error)
}
Expand Down Expand Up @@ -68,6 +70,12 @@ func (b *Broker) metaPath() string {
// URL returns the URL of the broker.
func (b *Broker) URL() url.URL { return b.Log.URL() }

// URLs returns a list of all broker URLs in the cluster.
func (b *Broker) URLs() []url.URL { return b.Log.URLs() }

// IsLeader returns true if the broker is the current cluster leader.
func (b *Broker) IsLeader() bool { return b.Log.IsLeader() }

// LeaderURL returns the URL to the leader broker.
func (b *Broker) LeaderURL() url.URL {
_, u := b.Log.Leader()
Expand Down
4 changes: 4 additions & 0 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,14 +707,18 @@ func (b *Broker) MustReadAllTopic(topicID uint64) (a []*messaging.Message) {
type BrokerLog struct {
ApplyFunc func(data []byte) (uint64, error)
ClusterIDFunc func() uint64
IsLeaderFunc func() bool
LeaderFunc func() (uint64, url.URL)
URLFunc func() url.URL
URLsFunc func() []url.URL
}

func (l *BrokerLog) Apply(data []byte) (uint64, error) { return l.ApplyFunc(data) }
func (l *BrokerLog) ClusterID() uint64 { return l.ClusterIDFunc() }
func (l *BrokerLog) IsLeader() bool { return l.IsLeaderFunc() }
func (l *BrokerLog) Leader() (uint64, url.URL) { return l.LeaderFunc() }
func (l *BrokerLog) URL() url.URL { return l.URLFunc() }
func (l *BrokerLog) URLs() []url.URL { return l.URLsFunc() }

// Messages represents a collection of messages.
// This type provides helper functions.
Expand Down
189 changes: 149 additions & 40 deletions messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import (
"time"
)

// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
const DefaultReconnectTimeout = 1000 * time.Millisecond
const (
// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
DefaultReconnectTimeout = 1000 * time.Millisecond

// DefaultPingInterval is the default time to wait between checks to the broker.
const DefaultPingInterval = 1000 * time.Millisecond
// DefaultPingInterval is the default time to wait between checks to the broker.
DefaultPingInterval = 1000 * time.Millisecond
)

// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
conns []*Conn
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs

Expand Down Expand Up @@ -80,10 +83,27 @@ func (c *Client) setURL(u url.URL) {
}
}

// RandomizeURL sets a random URL from the configuration.
func (c *Client) RandomizeURL() {
// URLs returns a list of possible broker URLs to connect to.
func (c *Client) URLs() []url.URL {
c.mu.Lock()
defer c.mu.Unlock()
return c.urls
}

// SetURLs sets a list of possible URLs to connect to for the client and its connections.
func (c *Client) SetURLs(a []url.URL) {
c.mu.Lock()
defer c.mu.Unlock()
c.setURLs(a)
}

func (c *Client) setURLs(a []url.URL) {
// Ignore if the URL list is the same.
if urlsEqual(c.urls, a) {
return
}

c.urls = a
c.randomizeURL()
}

Expand All @@ -102,8 +122,8 @@ func (c *Client) SetLogOutput(w io.Writer) {
c.Logger = log.New(w, "[messaging] ", log.LstdFlags)
}

// Open reads the configuration from the specified path or uses the URLs provided.
func (c *Client) Open(path string, urls []url.URL) error {
// Open opens the client and reads the configuration from the specified path.
func (c *Client) Open(path string) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -113,29 +133,12 @@ func (c *Client) Open(path string, urls []url.URL) error {
}

// Read URLs from file if no URLs are provided.
if len(urls) == 0 {
if b, err := ioutil.ReadFile(path); os.IsNotExist(err) {
// nop
} else if err != nil {
return err
} else {
var config ClientConfig
if err := json.Unmarshal(b, &config); err != nil {
return err
}
c.urls = config.Brokers
}
}

// Ensure we have at least one URL.
if len(urls) < 1 {
return ErrBrokerURLRequired
c.path = path
if err := c.loadConfig(); err != nil {
_ = c.close()
return fmt.Errorf("load config: %s", err)
}

// Set the URLs whether they're from the config or passed in.
c.urls = urls
c.randomizeURL()

// Set open flag.
c.opened = true

Expand All @@ -151,7 +154,10 @@ func (c *Client) Open(path string, urls []url.URL) error {
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.close()
}

func (c *Client) close() error {
// Return error if the client is already closed.
if !c.opened {
return ErrClientClosed
Expand Down Expand Up @@ -180,6 +186,53 @@ func (c *Client) Close() error {
return nil
}

// loadConfig reads the configuration from disk and sets the options on the client.
func (c *Client) loadConfig() error {
// Open config file for reading.
f, err := os.Open(c.path)
if os.IsNotExist(err) {
c.urls = nil
return nil
} else if err != nil {
return fmt.Errorf("open config: %s", err)
}
defer f.Close()

// Decode config from file.
var config ClientConfig
if err := json.NewDecoder(f).Decode(&config); err != nil {
return fmt.Errorf("decode config: %s", err)
}

// Set options.
c.urls = config.URLs

This comment has been minimized.

Copy link
@sitano

sitano Mar 17, 2015

this is a bug. setting urls (in c.setURLs()) into the client contains custom handling of urls list, following election of current leader url for processing http requests (c.randomizeURL() which is skipped here) with c.URL(). you should use c.setURLs(config.URLs) here instead of direct set. Currently the client you've got here can't post anything if loaded from config because of in c.url.


return nil
}

// setConfig writes a new config to disk and updates urls on the client.
func (c *Client) setConfig(config ClientConfig) error {
// Only write to disk if we have a path.
if c.path != "" {
// Open config file for writing.
f, err := os.Create(c.path)
if err != nil {
return fmt.Errorf("create: %s", err)
}
defer f.Close()

// Encode config to file.
if err := json.NewEncoder(f).Encode(&config); err != nil {
return fmt.Errorf("encode config: %s", err)
}
}

// Set options.
c.urls = config.URLs

This comment has been minimized.

Copy link
@sitano

sitano Mar 17, 2015

this is a bug. setting urls (in c.setURLs()) into the client contains custom handling of urls list, following election of current leader url for processing http requests (c.randomizeURL() which is skipped here) with c.URL(). you should use c.setURLs(config.URLs) here instead of direct set. Currently the client you've got here can't post anything if loaded from config because of in c.url.


return nil
}

// Publish sends a message to the broker and returns an index or error.
func (c *Client) Publish(m *Message) (uint64, error) {
// Post message to broker.
Expand All @@ -198,7 +251,7 @@ func (c *Client) Publish(m *Message) (uint64, error) {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return 0, errors.New(errstr)
}
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
return 0, fmt.Errorf("cannot publish: status=%d", resp.StatusCode)
}

// Parse broker index.
Expand All @@ -218,7 +271,26 @@ func (c *Client) Ping() error {
if err != nil {
return err
}
resp.Body.Close()
defer resp.Body.Close()

// Read entire body.
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read ping body: %s", err)
}

// Update config if body is passed back.
if len(b) != 0 {
var config ClientConfig
if err := json.Unmarshal(b, &config); err != nil {
return fmt.Errorf("unmarshal config: %s", err)
}

if err := c.setConfig(config); err != nil {
return fmt.Errorf("update config: %s", err)
}
}

return nil
}

Expand Down Expand Up @@ -293,15 +365,39 @@ func (c *Client) pinger(closing chan struct{}) {

// ClientConfig represents the configuration that must be persisted across restarts.
type ClientConfig struct {
Brokers []url.URL `json:"brokers"`
Leader url.URL `json:"leader"`
URLs []url.URL
}

// NewClientConfig returns a new instance of ClientConfig.
func NewClientConfig(u []url.URL) *ClientConfig {
return &ClientConfig{
Brokers: u,
func (c ClientConfig) MarshalJSON() ([]byte, error) {
var other clientConfigJSON
other.URLs = make([]string, len(c.URLs))
for i, u := range c.URLs {
other.URLs[i] = u.String()
}
return json.Marshal(&other)
}

func (c *ClientConfig) UnmarshalJSON(b []byte) error {
var other clientConfigJSON
if err := json.Unmarshal(b, &other); err != nil {
return err
}

c.URLs = make([]url.URL, len(other.URLs))
for i := range other.URLs {
u, err := url.Parse(other.URLs[i])
if err != nil {
return err
}
c.URLs[i] = *u
}

return nil
}

// clientConfigJSON represents the JSON
type clientConfigJSON struct {
URLs []string `json:"urls"`
}

// Conn represents a stream over the client for a single topic.
Expand Down Expand Up @@ -465,7 +561,7 @@ func (c *Conn) Heartbeat() error {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return errors.New(errstr)
}
return fmt.Errorf("heartbeat error: %d", resp.StatusCode)
return fmt.Errorf("heartbeat error: status=%d", resp.StatusCode)
}
return nil
}
Expand Down Expand Up @@ -563,3 +659,16 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
}
}
}

// urlsEqual returns true if a and b contain the same URLs in the same order.
func urlsEqual(a, b []url.URL) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
Loading

0 comments on commit 53dbec8

Please sign in to comment.