Skip to content

Commit

Permalink
Merge pull request #7839 from influxdata/er-subscriber
Browse files Browse the repository at this point in the history
Ensure subscriber service respects config
  • Loading branch information
e-dard authored Jan 13, 2017
2 parents 63e5bae + aa6b85a commit bb029b5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7585](https://github.com/influxdata/influxdb/pull/7585): Return Error instead of panic when decoding point values.
- [#7812](https://github.com/influxdata/influxdb/issues/7812): Fix slice out of bounds panic when pruning shard groups. Thanks @vladlopes
- [#7822](https://github.com/influxdata/influxdb/issues/7822): Drop database will delete /influxdb/data directory
- [#7838](https://github.com/influxdata/influxdb/issues/7838): Ensure Subscriber service can be disabled.

## v1.1.1 [2016-12-06]

Expand Down
9 changes: 9 additions & 0 deletions services/subscriber/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func NewService(c Config) *Service {

// Open starts the subscription service.
func (s *Service) Open() error {
if !s.conf.Enabled {
return nil // Service disabled.
}

s.mu.Lock()
defer s.mu.Unlock()
if s.MetaClient == nil {
Expand Down Expand Up @@ -106,6 +110,11 @@ func (s *Service) Open() error {
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return nil // Already closed.
}

s.closed = true

close(s.points)
Expand Down
6 changes: 3 additions & 3 deletions services/subscriber/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestService_Multiple(t *testing.T) {
expURL, _ := url.Parse(expURLStr)
select {
case u = <-urls:
case <-time.After(10 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
t.Fatal("expected urls")
}
if expURL.String() != u.String() {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestService_Multiple(t *testing.T) {
var pr *coordinator.WritePointsRequest
select {
case pr = <-prs:
case <-time.After(10 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
t.Fatal("expected points request")
}
if pr != expPR {
Expand All @@ -374,7 +374,7 @@ func TestService_Multiple(t *testing.T) {
for i := 0; i < 2; i++ {
select {
case pr = <-prs:
case <-time.After(10 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
t.Fatalf("expected points request: got %d exp 2", i)
}
if pr != expPR {
Expand Down

0 comments on commit bb029b5

Please sign in to comment.