Skip to content

Commit

Permalink
Revert "Update handling when server is stopped. (#1535)" (#1542)
Browse files Browse the repository at this point in the history
This reverts commit b430e4c.
  • Loading branch information
simitt authored Nov 13, 2018
1 parent ec55345 commit 9b91fc7
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type publisher struct {
tracer *elasticapm.Tracer
client beat.Client
m sync.RWMutex
stopped chan bool
stopped bool
}

type PendingReq struct {
Expand Down Expand Up @@ -79,9 +79,8 @@ func NewPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration,
}

p := &publisher{
tracer: tracer,
client: client,
stopped: make(chan bool, 1),
tracer: tracer,
client: client,

// Set channel size to N - 1. One request will be actively processed by the
// worker, while the other concurrent requests will be buffered in the queue.
Expand All @@ -103,7 +102,9 @@ func (p *publisher) Client() beat.Client {
// The worker will drain the queue on shutdown, but no more pending requests
// will be published.
func (p *publisher) Stop() {
p.stopped <- true
p.m.Lock()
p.stopped = true
p.m.Unlock()
close(p.pendingRequests)
p.client.Close()
}
Expand All @@ -112,11 +113,15 @@ func (p *publisher) Stop() {
// an error is returned.
// Calling send after Stop will return an error.
func (p *publisher) Send(ctx context.Context, req PendingReq) error {
p.m.RLock()
defer p.m.RUnlock()
if p.stopped {
return ErrChannelClosed
}

select {
case <-ctx.Done():
return ctx.Err()
case <-p.stopped:
return ErrChannelClosed
case p.pendingRequests <- req:
return nil
case <-time.After(time.Second * 1): // this forces the go scheduler to try something else for a while
Expand Down

0 comments on commit 9b91fc7

Please sign in to comment.