Skip to content

Commit

Permalink
Gather elasticsearch nodes in goroutines, handle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Jan 22, 2016
1 parent e910a03 commit f2ab5f6
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package elasticsearch

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf/internal"
Expand Down Expand Up @@ -93,21 +96,41 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc inputs.Accumulator) error {
errChan := make(chan error, len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))

for _, serv := range e.Servers {
var url string
if e.Local {
url = serv + statsPathLocal
} else {
url = serv + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
return err
}
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", serv), acc)
}
go func(s string, acc inputs.Accumulator) {
defer wg.Done()
var url string
if e.Local {
url = s + statsPathLocal
} else {
url = s + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err
return
}
if e.ClusterHealth {
e.gatherClusterStats(fmt.Sprintf("%s/_cluster/health?level=indices", s), acc)
}
}(serv, acc)
}
return nil

wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}

if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
}

func (e *Elasticsearch) gatherNodeStats(url string, acc inputs.Accumulator) error {
Expand Down

0 comments on commit f2ab5f6

Please sign in to comment.