Skip to content

Commit

Permalink
Make Histogram observations atomic
Browse files Browse the repository at this point in the history
Fixes #275

Benchmarks haven't changed significantly, despite the additional step
to retrieve the hot counts during Observe, and the more involved
Write. (Write is a bit slower now, but note that Write is supposed to
be a relatively rare operation and thus not in the hot path compared
to Observe.) Allocts haven't changed at all.

OLD:

BenchmarkHistogramWithLabelValues-4     10000000               151 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramNoLabels-4            50000000                36.0 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve1-4            50000000                28.1 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve2-4            10000000               160 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve4-4             5000000               378 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve8-4             2000000               768 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramWrite1-4               1000000              1589 ns/op             896 B/op         37 allocs/op
BenchmarkHistogramWrite2-4                500000              2973 ns/op            1792 B/op         74 allocs/op
BenchmarkHistogramWrite4-4                300000              6979 ns/op            3584 B/op        148 allocs/op
BenchmarkHistogramWrite8-4                100000             10701 ns/op            7168 B/op        296 allocs/op

NEW:

BenchmarkHistogramWithLabelValues-4     10000000               152 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramNoLabels-4            30000000                39.2 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve1-4            50000000                29.4 ns/op             0 B/op          0 allocs/op
BenchmarkHistogramObserve2-4            10000000               163 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve4-4             5000000               347 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramObserve8-4             2000000               684 ns/op               0 B/op          0 allocs/op
BenchmarkHistogramWrite1-4               1000000              1807 ns/op             896 B/op         37 allocs/op
BenchmarkHistogramWrite2-4                500000              3321 ns/op            1792 B/op         74 allocs/op
BenchmarkHistogramWrite4-4                200000              7087 ns/op            3584 B/op        148 allocs/op
BenchmarkHistogramWrite8-4                100000             15366 ns/op            7168 B/op        296 allocs/op

Signed-off-by: beorn7 <[email protected]>
  • Loading branch information
beorn7 committed Sep 7, 2018
1 parent 1e08f78 commit 050e30a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 31 deletions.
103 changes: 78 additions & 25 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package prometheus
import (
"fmt"
"math"
"runtime"
"sort"
"sync"
"sync/atomic"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -200,28 +202,34 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
}
}
}
// Finally we know the final length of h.upperBounds and can make counts.
h.counts = make([]uint64, len(h.upperBounds))
// Finally we know the final length of h.upperBounds and can make counts
// for both states:
h.counts[0].buckets = make([]uint64, len(h.upperBounds))
h.counts[1].buckets = make([]uint64, len(h.upperBounds))

h.init(h) // Init self-collection.
return h
}

type histogram struct {
type histogramCounts struct {
// sumBits contains the bits of the float64 representing the sum of all
// observations. sumBits and count have to go first in the struct to
// guarantee alignment for atomic operations.
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
sumBits uint64
count uint64
// observations. sumBits,observationsStarted, observationsCompleted have
// to go first in the struct to guarantee alignment for atomic
// operations. http://golang.org/pkg/sync/atomic/#pkg-note-BUG
sumBits uint64
observationsStarted uint64
observationsCompleted uint64
buckets []uint64
}

type histogram struct {
selfCollector
// Note that there is no mutex required.

desc *Desc
desc *Desc
writeMtx sync.Mutex

upperBounds []float64
counts []uint64
counts [2]histogramCounts
hotCounts uint32 // counts[0] if even, counts[1] if odd.

labelPairs []*dto.LabelPair
}
Expand All @@ -241,36 +249,81 @@ func (h *histogram) Observe(v float64) {
// 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op
// 300 buckets: 154 ns/op linear - binary 61.6 ns/op
i := sort.SearchFloat64s(h.upperBounds, v)
if i < len(h.counts) {
atomic.AddUint64(&h.counts[i], 1)

hotCounts := &h.counts[atomic.LoadUint32(&h.hotCounts)%2]

atomic.AddUint64(&hotCounts.observationsStarted, 1)
if i < len(h.upperBounds) {
atomic.AddUint64(&hotCounts.buckets[i], 1)
}
atomic.AddUint64(&h.count, 1)
for {
oldBits := atomic.LoadUint64(&h.sumBits)
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
if atomic.CompareAndSwapUint64(&h.sumBits, oldBits, newBits) {
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
break
}
}
atomic.AddUint64(&hotCounts.observationsCompleted, 1)
}

func (h *histogram) Write(out *dto.Metric) error {
his := &dto.Histogram{}
buckets := make([]*dto.Bucket, len(h.upperBounds))

his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&h.sumBits)))
his.SampleCount = proto.Uint64(atomic.LoadUint64(&h.count))
var count uint64
for i, upperBound := range h.upperBounds {
count += atomic.LoadUint64(&h.counts[i])
buckets[i] = &dto.Bucket{
CumulativeCount: proto.Uint64(count),
UpperBound: proto.Float64(upperBound),
// For simplicity, we mutex the rest of this method. It is not in the
// hot path, i.e. Observe is called much more often than Write. The
// complication of making Write lock-less isn't worth it.
h.writeMtx.Lock()
defer h.writeMtx.Unlock()

// Swap the hot and cold counts.
newHot := atomic.AddUint32(&h.hotCounts, 1)
hotCounts := &h.counts[newHot%2]
coldCounts := &h.counts[(newHot-1)%2]

for {
// Take the SampleCount from observationsCompleted so that we
// can test if we had running observations further down.
his.SampleCount = proto.Uint64(atomic.LoadUint64(&coldCounts.observationsCompleted))
his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits)))
var count uint64
for i, upperBound := range h.upperBounds {
count += atomic.LoadUint64(&coldCounts.buckets[i])
buckets[i] = &dto.Bucket{
CumulativeCount: proto.Uint64(count),
UpperBound: proto.Float64(upperBound),
}
}
// Now check if the current number of observationsStarted
// matches the number of observationsCompleted loaded above. If
// so, we can be certain no observations had been ongoing
// modifying the cold counts.
if his.GetSampleCount() == atomic.LoadUint64(&coldCounts.observationsStarted) {
break
}
runtime.Gosched()
}
his.Bucket = buckets
out.Histogram = his
out.Label = h.labelPairs

// Finally add all the cold counts to the new hot counts and reset the cold counts.
atomic.AddUint64(&hotCounts.observationsStarted, his.GetSampleCount())
atomic.StoreUint64(&coldCounts.observationsStarted, 0)
atomic.AddUint64(&hotCounts.observationsCompleted, his.GetSampleCount())
atomic.StoreUint64(&coldCounts.observationsCompleted, 0)
for {
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum())
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
atomic.StoreUint64(&coldCounts.sumBits, 0)
break
}
}
for i := range h.upperBounds {
atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i]))
atomic.StoreUint64(&coldCounts.buckets[i], 0)
}
return nil
}

Expand Down
18 changes: 12 additions & 6 deletions prometheus/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestHistogramAtomicObserve(t *testing.T) {

defer func() { close(quit) }()

go func() {
observe := func() {
for {
select {
case <-quit:
Expand All @@ -367,19 +367,25 @@ func TestHistogramAtomicObserve(t *testing.T) {
his.Observe(1)
}
}
}()
}

go observe()
go observe()
go observe()

for i := 0; i < 100; i++ {
for i := 0; i < 100000; i++ {
m := &dto.Metric{}
if err := his.Write(m); err != nil {
t.Fatal("unexpected error writing histogram:", err)
}
h := m.GetHistogram()
if h.GetSampleCount() != uint64(h.GetSampleSum()) ||
h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() {
h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() ||
h.GetSampleCount() != h.GetBucket()[2].GetCumulativeCount() {
t.Fatalf(
"inconsistent counts in histogram: count=%d sum=%f bucket=%d",
h.GetSampleCount(), h.GetSampleSum(), h.GetBucket()[1].GetCumulativeCount(),
"inconsistent counts in histogram: count=%d sum=%f buckets=[%d, %d]",
h.GetSampleCount(), h.GetSampleSum(),
h.GetBucket()[1].GetCumulativeCount(), h.GetBucket()[2].GetCumulativeCount(),
)
}
runtime.Gosched()
Expand Down

0 comments on commit 050e30a

Please sign in to comment.