Skip to content

Commit

Permalink
Fix shard close race condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Mar 14, 2015
1 parent 06d8392 commit 7dc465b
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"

"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/messaging"
)

// ShardGroup represents a group of shards created for a single time range.
Expand Down Expand Up @@ -63,6 +65,9 @@ type Shard struct {
index uint64 // highest replicated index
store *bolt.DB // underlying data store
conn MessagingConn // streaming connection to broker

wg sync.WaitGroup // pending goroutines
closing chan struct{} // close notification
}

// newShard returns a new initialized Shard instance.
Expand Down Expand Up @@ -93,26 +98,35 @@ func (s *Shard) open(path string, conn MessagingConn) error {
s.index = btou64(buf)
}

// Open connection.
if err := conn.Open(s.index, true); err != nil {
return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err)
}

return nil
}); err != nil {
_ = s.close()
return fmt.Errorf("init: %s", err)
}

// Open connection.
if err := conn.Open(s.index, true); err != nil {
_ = s.close()
return fmt.Errorf("open shard conn: id=%d, idx=%d, err=%s", s.ID, s.index, err)
}

// Start importing from connection.
go s.processor(conn)
s.closing = make(chan struct{})
s.wg.Add(1)
go s.processor(conn, s.closing)

return nil
}

// close shuts down the shard's store.
func (s *Shard) close() error {
// Wait for goroutines to stop.
if s.closing != nil {
close(s.closing)
s.closing = nil
}

s.wg.Wait()

if s.store != nil {
_ = s.store.Close()
}
Expand Down Expand Up @@ -201,12 +215,20 @@ func (s *Shard) dropSeries(seriesID uint32) error {
}

// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Shard) processor(conn MessagingConn) {
func (s *Shard) processor(conn MessagingConn, closing <-chan struct{}) {
defer s.wg.Done()

for {
// Read incoming message.
// Exit if the connection has been closed.
m, ok := <-conn.C()
if !ok {
// Exit if the connection has been closed or if shard is closing.
var ok bool
var m *messaging.Message
select {
case m, ok = <-conn.C():
if !ok {
return
}
case <-closing:
return
}

Expand Down

0 comments on commit 7dc465b

Please sign in to comment.