Skip to content

Commit

Permalink
Merge pull request #1758 from influxdb/start-graphite
Browse files Browse the repository at this point in the history
Add Graphite Integration Test
  • Loading branch information
corylanou committed Mar 11, 2015
2 parents 8baf5fa + c46caf8 commit 41607c2
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 11 deletions.
9 changes: 7 additions & 2 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
log.Fatal(err)
log.Fatalf("Broker failed to listen on %s. %s ", config.BrokerAddr(), err)
}
go func() { log.Fatal(http.Serve(listener, h)) }()
go func() {
err := http.Serve(listener, h)
if err != nil {
log.Fatalf("Broker failed to server on %s.: %s", config.BrokerAddr(), err)
}
}()
log.Printf("broker listening on %s", config.BrokerAddr())

// have it occasionally tell a data node in the cluster to run continuous queries
Expand Down
104 changes: 95 additions & 9 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -66,7 +69,7 @@ type Cluster []*Node
// the testing is marked as failed.
//
// This function returns a slice of nodes, the first of which will be the leader.
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster {
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int, baseConfig *main.Config) Cluster {
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
if nNodes < 1 {
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
Expand All @@ -85,7 +88,10 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
_ = os.RemoveAll(tmpDataDir)

// Create the first node, special case.
c := main.NewConfig()
c := baseConfig
if c == nil {
c = main.NewConfig()
}
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort))
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort))
c.Broker.Port = basePort
Expand Down Expand Up @@ -167,9 +173,6 @@ func write(t *testing.T, node *Node, data string) {
body, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body))
}

// Until races are solved.
time.Sleep(3 * time.Second)
}

// query executes the given query against all nodes in the cluster, and verifies no errors occured, and
Expand Down Expand Up @@ -202,6 +205,38 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected string) (string,
return "", true
}

// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and
// ensures the returned data is as expected until the timeout occurs
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeout time.Duration) (string, bool) {
v := url.Values{"q": []string{q}}
if urlDb != "" {
v.Set("db", urlDb)
}

var (
timedOut int32
timer = time.NewTimer(time.Duration(math.MaxInt64))
)
defer timer.Stop()
if timeout > 0 {
timer.Reset(time.Duration(timeout) * time.Millisecond)
go func() {
<-timer.C
atomic.StoreInt32(&timedOut, 1)
}()
}

for {
if got, ok := query(t, nodes, urlDb, q, expected); ok {
return got, ok
} else if atomic.LoadInt32(&timedOut) == 1 {
return fmt.Sprintf("timed out before expected result was found: got: %s", got), false
} else {
time.Sleep(10 * time.Millisecond)
}
}
}

// runTests_Errors tests some basic error cases.
func runTests_Errors(t *testing.T, nodes Cluster) {
t.Logf("Running tests against %d-node cluster", len(nodes))
Expand Down Expand Up @@ -754,7 +789,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
if tt.queryDb != "" {
urlDb = tt.queryDb
}
got, ok := query(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention))
got, ok := queryAndWait(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention), 3*time.Second)
if !ok {
t.Errorf("Test \"%s\" failed\n exp: %s\n got: %s\n", name, rewriteDbRp(tt.expected, database, retention), got)
}
Expand All @@ -772,7 +807,7 @@ func TestSingleServer(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090)
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil)

runTestsData(t, testName, nodes, "mydb", "myrp")
}
Expand All @@ -788,7 +823,7 @@ func Test3NodeServer(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190)
nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190, nil)

runTestsData(t, testName, nodes, "mydb", "myrp")
}
Expand All @@ -807,7 +842,7 @@ func TestClientLibrary(t *testing.T) {
retentionPolicy := "myrp"
now := time.Now().UTC()

nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290)
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290, nil)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retentionPolicy)

Expand Down Expand Up @@ -878,6 +913,56 @@ func TestClientLibrary(t *testing.T) {
}
}

func Test_ServerSingleGraphiteIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8390
testName := "graphite integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Millisecond)
c := main.NewConfig()
g := main.Graphite{
Enabled: true,
Database: "graphite",
Protocol: "TCP",
}
c.Graphites = append(c.Graphites, g)

t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`cpu 23.456 `)
data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...)
data = append(data, '\n')
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

// helper funcs

func errToString(err error) string {
Expand All @@ -893,4 +978,5 @@ func mustMarshalJSON(v interface{}) string {
panic(e)
}
return string(b)

}

0 comments on commit 41607c2

Please sign in to comment.