Skip to content

Commit

Permalink
Race condition fix: copy BatchPoints into goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 8, 2015
1 parent d9f1a60 commit 7293376
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ build: prepare
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go

dev: prepare
$(GOBIN)/godep go build -race -o telegraf -ldflags \
"-X main.Version=$(VERSION)" \
./cmd/telegraf/telegraf.go

build-linux-bins: prepare
GOARCH=amd64 GOOS=linux $(GOBIN)/godep go build -o telegraf_linux_amd64 \
-ldflags "-X main.Version=$(VERSION)" \
Expand Down
41 changes: 41 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,47 @@ type BatchPoints struct {
Config *ConfiguredPlugin
}

// deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.mu.Lock()
defer bp.mu.Unlock()

var bpc BatchPoints
bpc.Time = bp.Time
bpc.Precision = bp.Precision

bpc.Tags = make(map[string]string)
for k, v := range bp.Tags {
bpc.Tags[k] = v
}

var pts []client.Point
for _, pt := range bp.Points {
var ptc client.Point

ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
ptc.Precision = pt.Precision
ptc.Raw = pt.Raw

ptc.Tags = make(map[string]string)
ptc.Fields = make(map[string]interface{})

for k, v := range pt.Tags {
ptc.Tags[k] = v
}

for k, v := range pt.Fields {
ptc.Fields[k] = v
}
pts = append(pts, ptc)
}

bpc.Points = pts
return &bpc
}

// Add adds a measurement
func (bp *BatchPoints) Add(
measurement string,
Expand Down
6 changes: 5 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,14 @@ func (a *Agent) flush(bp *BatchPoints) error {

for _, o := range a.outputs {
wg.Add(1)

// Copy BatchPoints
bpc := bp.deepcopy()

go func(ro *runningOutput) {
defer wg.Done()
// Log all output errors:
if err := ro.output.Write(bp.BatchPoints); err != nil {
if err := ro.output.Write(bpc.BatchPoints); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err)
outerr = errors.New("Error encountered flushing outputs")
}
Expand Down

0 comments on commit 7293376

Please sign in to comment.