Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added framework for cluster integration testing #4848

Merged
merged 15 commits into from
Dec 3, 2015
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#4841](https://github.com/influxdb/influxdb/pull/4841): Improve point parsing speed. Lint models pacakge. Thanks @e-dard!
- [#4889](https://github.com/influxdb/influxdb/pull/4889): Implement close notifier and timeout on executors
- [#2676](https://github.com/influxdb/influxdb/issues/2676), [#4866](https://github.com/influxdb/influxdb/pull/4866): Add support for specifying default retention policy in database create. Thanks @pires!
- [#4848](https://github.com/influxdb/influxdb/pull/4848): Added framework for cluster integration testing.

### Bugfixes
- [#4876](https://github.com/influxdb/influxdb/pull/4876): Complete lint for monitor and services packages. Thanks @e-dard!
Expand Down
2 changes: 1 addition & 1 deletion circle-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

BUILD_DIR=$HOME/influxdb-build
GO_VERSION=go1.4.2
PARALLELISM="-parallel 256"
PARALLELISM="-parallel 1"
TIMEOUT="-timeout 480s"

# Executes the given statement, and exits if the command returns a non-zero code.
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (s *Server) Open() error {
// The port 0 is used, we need to retrieve the port assigned by the kernel
if strings.HasSuffix(s.BindAddress, ":0") {
s.MetaStore.Addr = ln.Addr()
s.MetaStore.RemoteAddr = ln.Addr()
}

// Multiplex listener.
Expand Down
355 changes: 355 additions & 0 deletions cmd/influxd/run/server_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package run_test

import (
"fmt"
"strings"
"testing"
"time"

"github.com/influxdb/influxdb/cmd/influxd/run"
)

func TestCluster_CreateDatabase(t *testing.T) {
t.Parallel()

c, err := NewClusterWithDefaults(5)
defer c.Close()
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
}

func TestCluster_Write(t *testing.T) {
t.Parallel()

c, err := NewClusterWithDefaults(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

writes := []string{
fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
}

_, err = c.Servers[0].Write("db0", "default", strings.Join(writes, "\n"), nil)
if err != nil {
t.Fatal(err)
}

q := &Query{
name: "write",
command: `SELECT * FROM db0."default".cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
}
err = c.QueryAll(q)
if err != nil {
t.Fatal(err)
}
}

func TestCluster_DatabaseCommands(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}

defer c.Close()

test := tests.load(t, "database_commands")

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropAndRecreateDatabase(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_and_recreate_database")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
t.Fatal(err)
}

_, err = c.Servers[0].Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropDatabaseIsolated(t *testing.T) {
t.Parallel()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_database_isolated")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
if err := s.CreateDatabaseAndRetentionPolicy("db1", newRetentionPolicyInfo("rp1", 1, 0)); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropAndRecreateSeries(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_and_recreate_series")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Fatal(query.Error(err))
}
}

// Re-write data and test again.
retest := tests.load(t, "drop_and_recreate_series_retest")

_, err = s.Write(retest.database(), retest.retentionPolicy(), retest.write, nil)
if err != nil {
t.Fatal(err)
}
for _, query := range retest.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_Query_DropSeriesFromRegex(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "drop_series_from_regex")

s := c.Servers[0]
if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
t.Fatal(err)
}

_, err = s.Write(test.database(), test.retentionPolicy(), test.write, nil)
if err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_RetentionPolicyCommands(t *testing.T) {
t.Parallel()

configFunc := func(index int, config *run.Config) {
config.Meta.RetentionAutoCreate = false
}

c, err := NewClusterCustom(5, configFunc)

if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "retention_policy_commands")

s := c.Servers[0]
if _, err := s.MetaStore.CreateDatabase(test.database()); err != nil {
t.Fatal(err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}

func TestCluster_DatabaseRetentionPolicyAutoCreate(t *testing.T) {
t.Parallel()
t.Skip()
c, err := NewCluster(5)
if err != nil {
t.Fatalf("error creating cluster: %s", err)
}
defer c.Close()

test := tests.load(t, "retention_policy_auto_create")

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
t.Logf("Running %s", query.name)
if query.once {
if _, err := c.Query(query); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
continue
}
if err := c.QueryAll(query); err != nil {
t.Error(query.Error(err))
}
}
}
Loading