diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 331783a75..b138ff2c5 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -16,7 +16,9 @@ package prometheus import ( "fmt" "math" + "runtime" "sort" + "sync" "sync/atomic" "github.com/golang/protobuf/proto" @@ -200,28 +202,34 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr } } } - // Finally we know the final length of h.upperBounds and can make counts. - h.counts = make([]uint64, len(h.upperBounds)) + // Finally we know the final length of h.upperBounds and can make counts + // for both states: + h.counts[0].buckets = make([]uint64, len(h.upperBounds)) + h.counts[1].buckets = make([]uint64, len(h.upperBounds)) h.init(h) // Init self-collection. return h } -type histogram struct { +type histogramCounts struct { // sumBits contains the bits of the float64 representing the sum of all - // observations. sumBits and count have to go first in the struct to - // guarantee alignment for atomic operations. - // http://golang.org/pkg/sync/atomic/#pkg-note-BUG - sumBits uint64 - count uint64 + // observations. sumBits,observationsStarted, observationsCompleted have + // to go first in the struct to guarantee alignment for atomic + // operations. http://golang.org/pkg/sync/atomic/#pkg-note-BUG + sumBits uint64 + observationsStarted uint64 + observationsCompleted uint64 + buckets []uint64 +} +type histogram struct { selfCollector - // Note that there is no mutex required. - - desc *Desc + desc *Desc + writeMtx sync.Mutex upperBounds []float64 - counts []uint64 + counts [2]histogramCounts + hotCounts uint32 // counts[0] if even, counts[1] if odd. labelPairs []*dto.LabelPair } @@ -241,36 +249,81 @@ func (h *histogram) Observe(v float64) { // 100 buckets: 78.1 ns/op linear - binary 54.9 ns/op // 300 buckets: 154 ns/op linear - binary 61.6 ns/op i := sort.SearchFloat64s(h.upperBounds, v) - if i < len(h.counts) { - atomic.AddUint64(&h.counts[i], 1) + + hotCounts := &h.counts[atomic.LoadUint32(&h.hotCounts)%2] + + atomic.AddUint64(&hotCounts.observationsStarted, 1) + if i < len(h.upperBounds) { + atomic.AddUint64(&hotCounts.buckets[i], 1) } - atomic.AddUint64(&h.count, 1) for { - oldBits := atomic.LoadUint64(&h.sumBits) + oldBits := atomic.LoadUint64(&hotCounts.sumBits) newBits := math.Float64bits(math.Float64frombits(oldBits) + v) - if atomic.CompareAndSwapUint64(&h.sumBits, oldBits, newBits) { + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { break } } + atomic.AddUint64(&hotCounts.observationsCompleted, 1) } func (h *histogram) Write(out *dto.Metric) error { his := &dto.Histogram{} buckets := make([]*dto.Bucket, len(h.upperBounds)) - his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&h.sumBits))) - his.SampleCount = proto.Uint64(atomic.LoadUint64(&h.count)) - var count uint64 - for i, upperBound := range h.upperBounds { - count += atomic.LoadUint64(&h.counts[i]) - buckets[i] = &dto.Bucket{ - CumulativeCount: proto.Uint64(count), - UpperBound: proto.Float64(upperBound), + // For simplicity, we mutex the rest of this method. It is not in the + // hot path, i.e. Observe is called much more often than Write. The + // complication of making Write lock-less isn't worth it. + h.writeMtx.Lock() + defer h.writeMtx.Unlock() + + // Swap the hot and cold counts. + newHot := atomic.AddUint32(&h.hotCounts, 1) + hotCounts := &h.counts[newHot%2] + coldCounts := &h.counts[(newHot-1)%2] + + for { + // Take the SampleCount from observationsCompleted so that we + // can test if we had running observations further down. + his.SampleCount = proto.Uint64(atomic.LoadUint64(&coldCounts.observationsCompleted)) + his.SampleSum = proto.Float64(math.Float64frombits(atomic.LoadUint64(&coldCounts.sumBits))) + var count uint64 + for i, upperBound := range h.upperBounds { + count += atomic.LoadUint64(&coldCounts.buckets[i]) + buckets[i] = &dto.Bucket{ + CumulativeCount: proto.Uint64(count), + UpperBound: proto.Float64(upperBound), + } } + // Now check if the current number of observationsStarted + // matches the number of observationsCompleted loaded above. If + // so, we can be certain no observations had been ongoing + // modifying the cold counts. + if his.GetSampleCount() == atomic.LoadUint64(&coldCounts.observationsStarted) { + break + } + runtime.Gosched() } his.Bucket = buckets out.Histogram = his out.Label = h.labelPairs + + // Finally add all the cold counts to the new hot counts and reset the cold counts. + atomic.AddUint64(&hotCounts.observationsStarted, his.GetSampleCount()) + atomic.StoreUint64(&coldCounts.observationsStarted, 0) + atomic.AddUint64(&hotCounts.observationsCompleted, his.GetSampleCount()) + atomic.StoreUint64(&coldCounts.observationsCompleted, 0) + for { + oldBits := atomic.LoadUint64(&hotCounts.sumBits) + newBits := math.Float64bits(math.Float64frombits(oldBits) + his.GetSampleSum()) + if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) { + atomic.StoreUint64(&coldCounts.sumBits, 0) + break + } + } + for i := range h.upperBounds { + atomic.AddUint64(&hotCounts.buckets[i], atomic.LoadUint64(&coldCounts.buckets[i])) + atomic.StoreUint64(&coldCounts.buckets[i], 0) + } return nil } diff --git a/prometheus/histogram_test.go b/prometheus/histogram_test.go index 9e5e3a1d4..cf1eab4a2 100644 --- a/prometheus/histogram_test.go +++ b/prometheus/histogram_test.go @@ -358,7 +358,7 @@ func TestHistogramAtomicObserve(t *testing.T) { defer func() { close(quit) }() - go func() { + observe := func() { for { select { case <-quit: @@ -367,19 +367,25 @@ func TestHistogramAtomicObserve(t *testing.T) { his.Observe(1) } } - }() + } + + go observe() + go observe() + go observe() - for i := 0; i < 100; i++ { + for i := 0; i < 100000; i++ { m := &dto.Metric{} if err := his.Write(m); err != nil { t.Fatal("unexpected error writing histogram:", err) } h := m.GetHistogram() if h.GetSampleCount() != uint64(h.GetSampleSum()) || - h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() { + h.GetSampleCount() != h.GetBucket()[1].GetCumulativeCount() || + h.GetSampleCount() != h.GetBucket()[2].GetCumulativeCount() { t.Fatalf( - "inconsistent counts in histogram: count=%d sum=%f bucket=%d", - h.GetSampleCount(), h.GetSampleSum(), h.GetBucket()[1].GetCumulativeCount(), + "inconsistent counts in histogram: count=%d sum=%f buckets=[%d, %d]", + h.GetSampleCount(), h.GetSampleSum(), + h.GetBucket()[1].GetCumulativeCount(), h.GetBucket()[2].GetCumulativeCount(), ) } runtime.Gosched()