Skip to content

Commit

Permalink
Fix race in test helpers. Fixes #8177
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Mar 29, 2017
1 parent be9a9f1 commit 7644ab1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- [#8167](https://github.com/influxdata/influxdb/issues/8167): Fix a regression when math was used with selectors.
- [#8175](https://github.com/influxdata/influxdb/issues/8175): Ensure the input for certain functions in the query engine are ordered.
- [#8171](https://github.com/influxdata/influxdb/issues/8171): Significantly improve shutdown speed for high cardinality databases.
- [#8177](https://github.com/influxdata/influxdb/issues/8177): Fix racy integration test.

## v1.2.2 [2017-03-14]

Expand Down
44 changes: 33 additions & 11 deletions tests/server_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -43,13 +44,6 @@ type Server interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}

// LocalServer is a Server that is running in-process and can be accessed directly
type LocalServer struct {
*client
*run.Server
Config *run.Config
}

// RemoteServer is a Server that is accessed remotely via the HTTP API
type RemoteServer struct {
*client
Expand Down Expand Up @@ -247,8 +241,20 @@ func OpenDefaultServer(c *run.Config) Server {
return s
}

// LocalServer is a Server that is running in-process and can be accessed directly
type LocalServer struct {
mu sync.RWMutex
*run.Server

*client
Config *run.Config
}

// Close shuts down the server and removes all temporary paths.
func (s *LocalServer) Close() {
s.mu.Lock()
defer s.mu.Unlock()

if err := s.Server.Close(); err != nil {
panic(err.Error())
}
Expand All @@ -264,11 +270,15 @@ func (s *LocalServer) Close() {
}

func (s *LocalServer) Closed() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Server == nil
}

// URL returns the base URL for the httpd endpoint.
func (s *LocalServer) URL() string {
s.mu.RLock()
defer s.mu.RUnlock()
for _, service := range s.Services {
if service, ok := service.(*httpd.Service); ok {
return "http://" + service.Addr().String()
Expand All @@ -278,11 +288,15 @@ func (s *LocalServer) URL() string {
}

func (s *LocalServer) CreateDatabase(db string) (*meta.DatabaseInfo, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.CreateDatabase(db)
}

// CreateDatabaseAndRetentionPolicy will create the database and retention policy.
func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec, makeDefault bool) error {
s.mu.RLock()
defer s.mu.RUnlock()
if _, err := s.MetaClient.CreateDatabase(db); err != nil {
return err
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, makeDefault); err != nil {
Expand All @@ -292,14 +306,20 @@ func (s *LocalServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.Reten
}

func (s *LocalServer) CreateSubscription(database, rp, name, mode string, destinations []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.CreateSubscription(database, rp, name, mode, destinations)
}

func (s *LocalServer) DropDatabase(db string) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.MetaClient.DropDatabase(db)
}

func (s *LocalServer) Reset() error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, db := range s.MetaClient.Databases() {
if err := s.DropDatabase(db.Name); err != nil {
return err
Expand All @@ -308,6 +328,12 @@ func (s *LocalServer) Reset() error {
return nil
}

func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points)
}

// client abstract querying and writing to a Server using HTTP
type client struct {
URLFn func() string
Expand Down Expand Up @@ -410,10 +436,6 @@ func (wr WriteError) Error() string {
return fmt.Sprintf("invalid status code: code=%d, body=%s", wr.statusCode, wr.body)
}

func (s *LocalServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
return s.PointsWriter.WritePoints(database, retentionPolicy, consistencyLevel, points)
}

// Write executes a write against the server and returns the results.
func (s *client) Write(db, rp, body string, params url.Values) (results string, err error) {
if params == nil {
Expand Down
1 change: 0 additions & 1 deletion tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7723,7 +7723,6 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
time.Sleep(10 * time.Millisecond)

close(done)
// Race occurs on s.Close()
}

// Ensure time in where clause is inclusive
Expand Down

0 comments on commit 7644ab1

Please sign in to comment.