Skip to content

Commit

Permalink
Merge pull request #4848 from influxdb/cluster-integration
Browse files Browse the repository at this point in the history
Added framework for cluster integration testing
  • Loading branch information
corylanou committed Dec 3, 2015
2 parents d85f3c3 + edf8e31 commit 67ea0b7
Show file tree
Hide file tree
Showing 16 changed files with 1,120 additions and 423 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#4841](https://github.com/influxdb/influxdb/pull/4841): Improve point parsing speed. Lint models pacakge. Thanks @e-dard!
- [#4889](https://github.com/influxdb/influxdb/pull/4889): Implement close notifier and timeout on executors
- [#2676](https://github.com/influxdb/influxdb/issues/2676), [#4866](https://github.com/influxdb/influxdb/pull/4866): Add support for specifying default retention policy in database create. Thanks @pires!
- [#4848](https://github.com/influxdb/influxdb/pull/4848): Added framework for cluster integration testing.

### Bugfixes
- [#4876](https://github.com/influxdb/influxdb/pull/4876): Complete lint for monitor and services packages. Thanks @e-dard!
Expand Down
2 changes: 1 addition & 1 deletion circle-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

BUILD_DIR=$HOME/influxdb-build
GO_VERSION=go1.4.2
PARALLELISM="-parallel 256"
PARALLELISM="-parallel 1"
TIMEOUT="-timeout 480s"

# Executes the given statement, and exits if the command returns a non-zero code.
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (s *Server) Open() error {
// The port 0 is used, we need to retrieve the port assigned by the kernel
if strings.HasSuffix(s.BindAddress, ":0") {
s.MetaStore.Addr = ln.Addr()
s.MetaStore.RemoteAddr = ln.Addr()
}

// Multiplex listener.
Expand Down
355 changes: 355 additions & 0 deletions cmd/influxd/run/server_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package run_test

import (
"fmt"
"strings"
"testing"
"time"

"github.com/influxdb/influxdb/cmd/influxd/run"
)

func TestCluster_CreateDatabase(t *testing.T) {
t.Parallel()

c, err := NewClusterWithDefaults(5)
defer c.Close()
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
}

func TestCluster_Write(t *testing.T) {
t.Parallel()

c, err := NewClusterWithDefaults(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

writes := []string{
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
}

_, err = c.Servers[0].Write("db0", "default", strings.Join(writes, "\n"), nil)
if err != nil {
t.Fatal(err)
}

q := &Query{
name: "write",
command: `SELECT * FROM db0."default".cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
}
err = c.QueryAll(q)
if err != nil {
t.Fatal(err)
}
}

func TestCluster_DatabaseCommands(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}

defer c.Close()

test := tests.load(t, "database_commands")

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_and_recreate_database")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
t.Fatal(err)
}

_, err = c.Servers[0].Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropDatabaseIsolated(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_database_isolated")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropAndRecreateSeries(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_and_recreate_series")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Fatal(query.Error(err))
}
}

// Re-write data and test again.
retest := tests.load(t, "drop_and_recreate_series_retest")

_, err = s.Write(retest.database(), retest.retentionPolicy(), retest.write, nil)
if err != nil {
t.Fatal(err)
}
for _, query := range retest.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropSeriesFromRegex(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_series_from_regex")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_RetentionPolicyCommands(t *testing.T) {
t.Parallel()

configFunc := func(index int, config *run.Config) {
config.Meta.RetentionAutoCreate = false
}

c, err := NewClusterCustom(5, configFunc)

if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "retention_policy_commands")

s := c.Servers[0]
if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_DatabaseRetentionPolicyAutoCreate(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "retention_policy_auto_create")

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}
Loading

0 comments on commit 67ea0b7

Please sign in to comment.