Skip to content

Commit

Permalink
the right way to wait for data
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Mar 11, 2015
1 parent 5f8b176 commit 98484b6
Showing 1 changed file with 37 additions and 70 deletions.
107 changes: 37 additions & 70 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -172,15 +173,6 @@ func write(t *testing.T, node *Node, data string) {
body, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body))
}

// Until races are solved.
time.Sleep(3 * time.Second)
//index, err := strconv.ParseUint(resp.Header.Get("X-InfluxDB-Index"), 10, 64)
//if err != nil {
//t.Fatalf("Couldn't get index. header: %s, err: %s.", resp.Header.Get("X-InfluxDB-Index"), err)
//}
//wait(t, testName, nodes, index)
//t.Log("Finished writing and waiting")
}

// query executes the given query against all nodes in the cluster, and verifies no errors occured, and
Expand Down Expand Up @@ -213,6 +205,38 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected string) (string,
return "", true
}

// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and
// ensures the returned data is as expected until the timeout occurs
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeout time.Duration) (string, bool) {
v := url.Values{"q": []string{q}}
if urlDb != "" {
v.Set("db", urlDb)
}

var (
timedOut int32
timer = time.NewTimer(time.Duration(math.MaxInt64))
)
defer timer.Stop()
if timeout > 0 {
timer.Reset(time.Duration(timeout) * time.Millisecond)
go func() {
<-timer.C
atomic.StoreInt32(&timedOut, 1)
}()
}

for {
if got, ok := query(t, nodes, urlDb, q, expected); ok {
return got, ok
} else if atomic.LoadInt32(&timedOut) == 1 {
return fmt.Sprintf("timed out before expected result was found: got: %s", got), false
} else {
time.Sleep(100 * time.Millisecond)
}
}
}

// runTests_Errors tests some basic error cases.
func runTests_Errors(t *testing.T, nodes Cluster) {
t.Logf("Running tests against %d-node cluster", len(nodes))
Expand Down Expand Up @@ -755,7 +779,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
if tt.queryDb != "" {
urlDb = tt.queryDb
}
got, ok := query(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention))
got, ok := queryAndWait(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention), 3*time.Second)
if !ok {
t.Errorf("Test \"%s\" failed\n exp: %s\n got: %s\n", name, rewriteDbRp(tt.expected, database, retention), got)
}
Expand Down Expand Up @@ -909,9 +933,6 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
return
}

// Get the current index
i := index(t, testName, nodes)

t.Log("Writing data")
data := []byte(`cpu 23.456 `)
data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...)
Expand All @@ -923,69 +944,15 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
return
}

// Wait for sync
wait(t, testName, nodes, i+4)

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))

got, ok := query(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected)
// query and wait for results
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

func index(t *testing.T, testName string, nodes Cluster) uint64 {
// The first node should be the leader so ask him for the index
u := urlFor(nodes[0].url, "", nil)
resp, err := http.Get(u.String())
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
i, err := strconv.ParseUint(string(body), 10, 64)
if err != nil {
t.Fatal(err)
}
return i
}

func wait(t *testing.T, testName string, nodes Cluster, index uint64) {
// Wait for the index to sync up
var wg sync.WaitGroup
wg.Add(len(nodes))
for _, n := range nodes {
go func(t *testing.T, testName string, nodes Cluster, u *url.URL, index uint64) {
u = urlFor(u, fmt.Sprintf("wait/%d", index), nil)
t.Logf("Test name %s: wait on node %s for index %d", testName, u, index)
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't wait: %s", err)
}

if resp.StatusCode != http.StatusOK {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Couldn't read body of response: %s", err)
}
t.Logf("resp.Body: %s\n", string(body))

i, _ := strconv.Atoi(string(body))
if i == 0 {
t.Fatalf("Unexpected body: %s", string(body))
}

wg.Done()

}(t, testName, nodes, n.url, index)
}
wg.Wait()
}

// helper funcs

func errToString(err error) string {
Expand Down

0 comments on commit 98484b6

Please sign in to comment.