Skip to content

Commit

Permalink
Merge pull request #1935 from influxdb/stateless-broker
Browse files Browse the repository at this point in the history
Stateless broker
  • Loading branch information
toddboom committed Mar 15, 2015
2 parents 11a908f + 7dc465b commit 236ab6f
Show file tree
Hide file tree
Showing 32 changed files with 3,951 additions and 2,704 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
src/

config.json
/test
/bin/

/pkg/
Expand Down
48 changes: 25 additions & 23 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
package influxdb

import (
"fmt"
"log"
"net/http"
"time"

"github.com/influxdb/influxdb/messaging"
)

// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker

done chan struct{}

// send CQ processing requests to the same data node
currentCQProcessingNode *messaging.Replica

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}

const (
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
// in the cluster to run any continuous queries that should be run.
Expand All @@ -38,23 +20,41 @@ const (
DefaultFailureSleep = 100 * time.Millisecond
)

// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker

done chan struct{}

// send CQ processing requests to the same data node
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson)

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}

// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
b := &Broker{
return &Broker{
Broker: messaging.NewBroker(),
TriggerInterval: 5 * time.Second,
TriggerTimeout: 20 * time.Second,
TriggerFailurePause: 1 * time.Second,
}
b.Broker = messaging.NewBroker()
return b
}

// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
// FIX(benbjohnson)
// b.done = make(chan struct{})
// go b.continuousQueryLoop(b.done)
}

/*
// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
Expand Down Expand Up @@ -128,3 +128,5 @@ func (b *Broker) requestContinuousQueryProcessing() error {
return nil
}
*/
2 changes: 2 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package influxdb_test

/*
import (
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -93,3 +94,4 @@ func (h *BrokerTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
}
*/
8 changes: 4 additions & 4 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (c *Config) DataAddrUDP() string {
}

// DataURL returns the URL required to contact the data server.
func (c *Config) DataURL() *url.URL {
return &url.URL{
func (c *Config) DataURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Data.Port)),
}
Expand All @@ -205,8 +205,8 @@ func (c *Config) BrokerAddr() string {
}

// BrokerURL returns the URL required to contact the Broker server.
func (c *Config) BrokerURL() *url.URL {
return &url.URL{
func (c *Config) BrokerURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Broker.Port)),
}
Expand Down
165 changes: 76 additions & 89 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/udp"
)

Expand All @@ -43,23 +44,26 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
initServer = initServer || initBroker

// Parse join urls from the --join flag.
var joinURLs []*url.URL
var joinURLs []url.URL
if join == "" {
joinURLs = parseURLs(config.JoinURLs())
} else {
joinURLs = parseURLs(join)
}

// Open broker, initialize or join as necessary.
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter)

// Configure debug of Raft module.
b.EnableRaftDebug(config.Logging.RaftTracing)
// Open broker & raft log, initialize or join as necessary.
b, l := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter, config.Logging.RaftTracing)

// Start the broker handler.
var h *Handler
if b != nil {
h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)}
h = &Handler{
brokerHandler: &messaging.Handler{
Broker: b.Broker,
RaftHandler: &raft.Handler{Log: l},
},
}

// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
Expand Down Expand Up @@ -181,8 +185,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// unless disabled, start the loop to report anonymous usage stats every 24h
if !config.ReportingDisabled {
// Make sure we have a config object b4 we try to use it.
if configObj := b.Broker.Log().Config(); configObj != nil {
clusterID := configObj.ClusterID
if clusterID := b.Broker.ClusterID(); clusterID != 0 {
go s.StartReportingLoop(version, clusterID)
}
}
Expand Down Expand Up @@ -231,52 +234,86 @@ func parseConfig(path, hostname string) *Config {
}

// creates and initializes a broker.
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, w io.Writer, raftTracing bool) (*influxdb.Broker, *raft.Log) {
// Create raft log.
l := raft.NewLog()
l.SetURL(u)
l.SetLogOutput(w)
l.DebugEnabled = raftTracing

// Create broker.
b := influxdb.NewBroker()
b.Log = l
b.SetLogOutput(w)

if err := b.Open(path, u); err != nil {
// Open broker so it can feed last index data to the log.
if err := b.Open(path); err != nil {
log.Fatalf("failed to open broker: %s", err)
}

// Attach the broker as the finite state machine of the raft log.
l.FSM = &messaging.RaftFSM{Broker: b}

// Open raft log inside broker directory.
if err := l.Open(filepath.Join(path, "raft")); err != nil {
log.Fatalf("raft: %s", err)
}

// If this is a new broker then we can initialize two ways:
// 1) Start a brand new cluster.
// 2) Join an existing cluster.
if initializing {
if len(joinURLs) == 0 {
initializeBroker(b)
if err := l.Initialize(); err != nil {
log.Fatalf("initialize raft log: %s", err)
}
} else {
joinBroker(b, joinURLs)
joinLog(l, joinURLs)
}
}

return b
}

// initializes a new broker.
func initializeBroker(b *influxdb.Broker) {
if err := b.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
return b, l
}

// joins a broker to an existing cluster.
func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
// joins a raft log to an existing cluster.
func joinLog(l *raft.Log, joinURLs []url.URL) {
// Attempts to join each server until successful.
for _, u := range joinURLs {
if err := b.Join(u); err != nil {
log.Printf("join: failed to connect to broker: %s: %s", u, err)
if err := l.Join(u); err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", u, err)
} else {
log.Printf("join: connected broker to %s", u)
log.Printf("join: connected raft log to %s", u)
return
}
}
log.Fatalf("join: failed to connect broker to any specified server")
log.Fatalf("join: failed to connect raft log to any specified server")
}

// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []url.URL, w io.Writer) *influxdb.Server {
// Use broker URL is there is no config and there are no join URLs passed.
clientJoinURLs := joinURLs
if !configExists || len(joinURLs) == 0 {
clientJoinURLs = []url.URL{b.URL()}
}

// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile)); err != nil {
log.Fatalf("messaging client error: %s", err)
}

// If join URLs were passed in then use them to override the client's URLs.
if len(clientJoinURLs) > 0 {
c.SetURLs(clientJoinURLs)
}

// If no URLs exist on the client the return an error since we cannot reach a broker.
if len(c.URLs()) == 0 {
log.Fatal("messaging client has no broker URLs")
}

// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand All @@ -287,72 +324,34 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)

if err := s.Open(config.Data.Dir); err != nil {
// Open server with data directory and broker client.
if err := s.Open(config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data server: %v", err.Error())
}

// If the server is uninitialized then initialize or join it.
if initServer {
if len(joinURLs) == 0 {
initializeServer(config.DataURL(), s, b, w, initBroker)
if initBroker {
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
} else {
joinServer(s, config.DataURL(), joinURLs)
}
}

if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []*url.URL{b.URL()}
openServerClient(s, joinURLs, w)
} else {
if len(joinURLs) == 0 {
// If a config exists, but no joinUrls are specified, fall back to the broker URL
// TODO: Make sure we have a leader, and then spin up the server
joinURLs = []*url.URL{b.URL()}
}
openServerClient(s, joinURLs, w)
}

return s
}

// initializes a new server that does not yet have an ID.
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
// TODO: Create replica using the messaging client.

if initBroker {
// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
}
}

// Create messaging client.
c := messaging.NewClient(1)
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}

if initBroker {
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
}

// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.

// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(u, joinURL); err != nil {
if err := s.Join(&u, &joinURL); err != nil {
log.Printf("join: failed to connect data node: %s: %s", u, err)
} else {
log.Printf("join: connected data node to %s", u)
Expand All @@ -362,20 +361,8 @@ func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
log.Fatalf("join: failed to connect data node to any specified server")
}

// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) {
c := messaging.NewClient(s.ID())
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), joinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
}

// parses a comma-delimited list of URLs.
func parseURLs(s string) (a []*url.URL) {
func parseURLs(s string) (a []url.URL) {
if s == "" {
return nil
}
Expand All @@ -385,7 +372,7 @@ func parseURLs(s string) (a []*url.URL) {
if err != nil {
log.Fatalf("cannot parse urls: %s", err)
}
a = append(a, u)
a = append(a, *u)
}
return
}
Expand Down
Loading

0 comments on commit 236ab6f

Please sign in to comment.