Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless broker #1935

Merged
merged 27 commits into from
Mar 15, 2015
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39ae8e6
Add topic segmentation.
benbjohnson Feb 25, 2015
16dbe8b
Add Broker.Truncate().
benbjohnson Mar 1, 2015
85be4e1
Merge branch 'master' of https://github.com/influxdb/influxdb into br…
benbjohnson Mar 1, 2015
1bbf154
Removing replicas and subscriptions from broker.
benbjohnson Mar 2, 2015
b937f06
Implementing stateless broker.
benbjohnson Mar 6, 2015
ef8658e
Continuing stateless broker refactor.
benbjohnson Mar 8, 2015
9b5aeb1
Refactor messaging client/conn.
benbjohnson Mar 8, 2015
713ca4b
Merge branch 'master' into stateless-broker
benbjohnson Mar 9, 2015
5f5c6ca
Integrate stateless messaging into influxdb package.
benbjohnson Mar 9, 2015
4160d0b
Add continuously streaming topic readers.
benbjohnson Mar 10, 2015
27e9132
Integrate stateless broker into remaining packages.
benbjohnson Mar 10, 2015
66115f9
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 10, 2015
5f6bcf5
Fix broker integration bugs.
benbjohnson Mar 11, 2015
7ab19b9
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 12, 2015
7880bc2
Add zero length data checks.
benbjohnson Mar 12, 2015
c7d4920
Update urlgen to end at current time.
benbjohnson Mar 12, 2015
12e8939
Fix messaging client redirection.
benbjohnson Mar 12, 2015
4b9a93d
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 12, 2015
fc189cd
Remove /test from .gitignore
benbjohnson Mar 12, 2015
8e813ec
Update CHANGELOG.md for v0.9.0-rc11
toddboom Mar 13, 2015
53dbec8
Add config notifications and increased test coverage.
benbjohnson Mar 14, 2015
8cb7be4
Merge branch 'stateless-broker' of https://github.com/influxdb/influx…
benbjohnson Mar 14, 2015
96748cb
Update file permissions.
benbjohnson Mar 14, 2015
b045ad5
Wrap open logic in anonymous functions.
benbjohnson Mar 14, 2015
41d357a
Fixes based on code review comments.
benbjohnson Mar 14, 2015
06d8392
Integration test delay.
benbjohnson Mar 14, 2015
7dc465b
Fix shard close race condition.
benbjohnson Mar 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
src/

config.json
/test
/bin/

/pkg/
Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## v0.9.0-rc11 [unreleased]
## v0.9.0-rc12 [unreleased]

## v0.9.0-rc11 [2015-03-12]

### Bugfixes
- [#1917](https://github.com/influxdb/influxdb/pull/1902): Creating Infinite Retention Policy Failed.
Expand Down
48 changes: 25 additions & 23 deletions broker.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,11 @@
package influxdb

import (
"fmt"
"log"
"net/http"
"time"

"github.com/influxdb/influxdb/messaging"
)

// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker

done chan struct{}

// send CQ processing requests to the same data node
currentCQProcessingNode *messaging.Replica

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}

const (
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
// in the cluster to run any continuous queries that should be run.
Expand All @@ -38,23 +20,41 @@ const (
DefaultFailureSleep = 100 * time.Millisecond
)

// Broker represents an InfluxDB specific messaging broker.
type Broker struct {
*messaging.Broker

done chan struct{}

// send CQ processing requests to the same data node
// currentCQProcessingNode *messaging.Replica // FIX(benbjohnson)

// variables to control when to trigger processing and when to timeout
TriggerInterval time.Duration
TriggerTimeout time.Duration
TriggerFailurePause time.Duration
}

// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
b := &Broker{
return &Broker{
Broker: messaging.NewBroker(),
TriggerInterval: 5 * time.Second,
TriggerTimeout: 20 * time.Second,
TriggerFailurePause: 1 * time.Second,
}
b.Broker = messaging.NewBroker()
return b
}

// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
func (b *Broker) RunContinuousQueryLoop() {
b.done = make(chan struct{})
go b.continuousQueryLoop(b.done)
// FIX(benbjohnson)
// b.done = make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are CQs disabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember Ben saying something about the brokers not knowing about the data nodes in this new branch, but I think that they should. The data nodes have to connect to get replication so they should include a connect string so that the raft leader can send requests down to the data nodes.

The brokers will need to know about the data nodes anyway so that they can redirect a data node that requests a truncated topic to another data node that can give them a copy of the entire thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The brokers now only track the highest index received by any data node so we aren't tracking the actual data node URLs themselves. I realized that was an issue once I got to CQs and remembered that they need cluster info to work.

I talked to Paul about this and we decided to send the data node URLs up to the broker. There's a periodic ping that occurs from the client so I can add it there. I disabled the CQs temporarily because it was more important to get messaging stable.

// go b.continuousQueryLoop(b.done)
}

/*


// Close closes the broker.
func (b *Broker) Close() error {
if b.done != nil {
Expand Down Expand Up @@ -128,3 +128,5 @@ func (b *Broker) requestContinuousQueryProcessing() error {

return nil
}

*/
2 changes: 2 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package influxdb_test

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this is just temporary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

import (
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -93,3 +94,4 @@ func (h *BrokerTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
}
*/
8 changes: 4 additions & 4 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (c *Config) DataAddrUDP() string {
}

// DataURL returns the URL required to contact the data server.
func (c *Config) DataURL() *url.URL {
return &url.URL{
func (c *Config) DataURL() url.URL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply because there is no need to return a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using non-pointer URLs is much, much simpler in typical use cases where you want to get a base URL and then add a path. If you do pointers then you need to make a copy so you don't alter the original. With non-pointers it'll do the copy for you and then you don't have to worry about that.

return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Data.Port)),
}
Expand All @@ -205,8 +205,8 @@ func (c *Config) BrokerAddr() string {
}

// BrokerURL returns the URL required to contact the Broker server.
func (c *Config) BrokerURL() *url.URL {
return &url.URL{
func (c *Config) BrokerURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Broker.Port)),
}
Expand Down
155 changes: 66 additions & 89 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
"github.com/influxdb/influxdb/udp"
)

Expand All @@ -43,23 +44,26 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
initServer = initServer || initBroker

// Parse join urls from the --join flag.
var joinURLs []*url.URL
var joinURLs []url.URL
if join == "" {
joinURLs = parseURLs(config.JoinURLs())
} else {
joinURLs = parseURLs(join)
}

// Open broker, initialize or join as necessary.
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter)

// Configure debug of Raft module.
b.EnableRaftDebug(config.Logging.RaftTracing)
// Open broker & raft log, initialize or join as necessary.
b, l := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter, config.Logging.RaftTracing)

// Start the broker handler.
var h *Handler
if b != nil {
h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)}
h = &Handler{
brokerHandler: &messaging.Handler{
Broker: b.Broker,
RaftHandler: &raft.Handler{Log: l},
},
}

// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
Expand Down Expand Up @@ -181,8 +185,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// unless disabled, start the loop to report anonymous usage stats every 24h
if !config.ReportingDisabled {
// Make sure we have a config object b4 we try to use it.
if configObj := b.Broker.Log().Config(); configObj != nil {
clusterID := configObj.ClusterID
if clusterID := b.Broker.ClusterID(); clusterID != 0 {
go s.StartReportingLoop(version, clusterID)
}
}
Expand Down Expand Up @@ -231,52 +234,76 @@ func parseConfig(path, hostname string) *Config {
}

// creates and initializes a broker.
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
func openBroker(path string, u url.URL, initializing bool, joinURLs []url.URL, w io.Writer, raftTracing bool) (*influxdb.Broker, *raft.Log) {
// Create raft log.
l := raft.NewLog()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems like Raft is no longer hidden inside a broker. Was this required by this change, or something that just made the whole thing cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes messaging much easier to test when we can control the interface to the Raft log. I'm trying to break pieces out into more testable chunks.

l.SetURL(u)
l.SetLogOutput(w)
l.DebugEnabled = raftTracing

// Create broker.
b := influxdb.NewBroker()
b.Log = l
b.SetLogOutput(w)

if err := b.Open(path, u); err != nil {
// Open broker so it can feed last index data to the log.
if err := b.Open(path); err != nil {
log.Fatalf("failed to open broker: %s", err)
}

// Attach the broker as the finite state machine of the raft log.
l.FSM = &messaging.RaftFSM{Broker: b}

// Open raft log inside broker directory.
if err := l.Open(filepath.Join(path, "raft")); err != nil {
log.Fatalf("raft: %s", err)
}

// If this is a new broker then we can initialize two ways:
// 1) Start a brand new cluster.
// 2) Join an existing cluster.
if initializing {
if len(joinURLs) == 0 {
initializeBroker(b)
if err := l.Initialize(); err != nil {
log.Fatalf("initialize raft log: %s", err)
}
} else {
joinBroker(b, joinURLs)
joinLog(l, joinURLs)
}
}

return b
}

// initializes a new broker.
func initializeBroker(b *influxdb.Broker) {
if err := b.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
return b, l
}

// joins a broker to an existing cluster.
func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
// joins a raft log to an existing cluster.
func joinLog(l *raft.Log, joinURLs []url.URL) {
// Attempts to join each server until successful.
for _, u := range joinURLs {
if err := b.Join(u); err != nil {
log.Printf("join: failed to connect to broker: %s: %s", u, err)
if err := l.Join(u); err != nil {
log.Printf("join: failed to connect to raft cluster: %s: %s", u, err)
} else {
log.Printf("join: connected broker to %s", u)
log.Printf("join: connected raft log to %s", u)
return
}
}
log.Fatalf("join: failed to connect broker to any specified server")
log.Fatalf("join: failed to connect raft log to any specified server")
}

// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []url.URL, w io.Writer) *influxdb.Server {
// Use broker URL is there is no config and there are no join URLs passed.
clientJoinURLs := joinURLs
if !configExists || len(joinURLs) == 0 {
clientJoinURLs = []url.URL{b.URL()}
}

// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), clientJoinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}

// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand All @@ -287,72 +314,34 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)

if err := s.Open(config.Data.Dir); err != nil {
// Open server with data directory and broker client.
if err := s.Open(config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data server: %v", err.Error())
}

// If the server is uninitialized then initialize or join it.
if initServer {
if len(joinURLs) == 0 {
initializeServer(config.DataURL(), s, b, w, initBroker)
if initBroker {
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
} else {
joinServer(s, config.DataURL(), joinURLs)
}
}

if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []*url.URL{b.URL()}
openServerClient(s, joinURLs, w)
} else {
if len(joinURLs) == 0 {
// If a config exists, but no joinUrls are specified, fall back to the broker URL
// TODO: Make sure we have a leader, and then spin up the server
joinURLs = []*url.URL{b.URL()}
}
openServerClient(s, joinURLs, w)
}

return s
}

// initializes a new server that does not yet have an ID.
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
// TODO: Create replica using the messaging client.

if initBroker {
// Create replica on broker.
if err := b.CreateReplica(1, u); err != nil {
log.Fatalf("replica creation error: %s", err)
}
}

// Create messaging client.
c := messaging.NewClient(1)
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []*url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}

if initBroker {
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
}

// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.

// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(u, joinURL); err != nil {
if err := s.Join(&u, &joinURL); err != nil {
log.Printf("join: failed to connect data node: %s: %s", u, err)
} else {
log.Printf("join: connected data node to %s", u)
Expand All @@ -362,20 +351,8 @@ func joinServer(s *influxdb.Server, u *url.URL, joinURLs []*url.URL) {
log.Fatalf("join: failed to connect data node to any specified server")
}

// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, joinURLs []*url.URL, w io.Writer) {
c := messaging.NewClient(s.ID())
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), joinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
}

// parses a comma-delimited list of URLs.
func parseURLs(s string) (a []*url.URL) {
func parseURLs(s string) (a []url.URL) {
if s == "" {
return nil
}
Expand All @@ -385,7 +362,7 @@ func parseURLs(s string) (a []*url.URL) {
if err != nil {
log.Fatalf("cannot parse urls: %s", err)
}
a = append(a, u)
a = append(a, *u)
}
return
}
Expand Down
Loading