Skip to content

Commit

Permalink
Minor refactor to make hashing sampler publicly accessible (#1662)
Browse files Browse the repository at this point in the history
* Minor refactor to make ShouldDownsample publicly accessible

Signed-off-by: Jude Wang <[email protected]>

* refactor DownsamplingWriter to use SamplingExecutor

Signed-off-by: Jude Wang <[email protected]>

* Update naming

Signed-off-by: Jude Wang <[email protected]>
  • Loading branch information
guanw authored and yurishkuro committed Jul 16, 2019
1 parent e0eb3e6 commit 6891422
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 44 deletions.
87 changes: 49 additions & 38 deletions storage/spanstore/downsampling_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ type downsamplingWriterMetrics struct {

// DownsamplingWriter is a span Writer that drops spans with a predefined downsamplingRatio.
type DownsamplingWriter struct {
spanWriter Writer
threshold uint64
lengthOfSalt int
hasherPool *sync.Pool
metrics downsamplingWriterMetrics
spanWriter Writer
metrics downsamplingWriterMetrics
sampler Sampler
}

// DownsamplingOptions contains the options for constructing a DownsamplingWriter.
Expand All @@ -61,37 +59,18 @@ type DownsamplingOptions struct {

// NewDownsamplingWriter creates a DownsamplingWriter.
func NewDownsamplingWriter(spanWriter Writer, downsamplingOptions DownsamplingOptions) *DownsamplingWriter {
threshold := uint64(downsamplingOptions.Ratio * float64(math.MaxUint64))
writeMetrics := &downsamplingWriterMetrics{}
metrics.Init(writeMetrics, downsamplingOptions.MetricsFactory, nil)
salt := downsamplingOptions.HashSalt
if salt == "" {
salt = defaultHashSalt
}
hashSaltBytes := []byte(salt)
pool := &sync.Pool{
New: func() interface{} {
buffer := make([]byte, len(hashSaltBytes)+traceIDByteSize)
copy(buffer, hashSaltBytes)
return &hasher{
hash: fnv.New64a(),
buffer: buffer,
}
},
}

return &DownsamplingWriter{
spanWriter: spanWriter,
threshold: threshold,
hasherPool: pool,
metrics: *writeMetrics,
lengthOfSalt: len(hashSaltBytes),
sampler: NewSampler(downsamplingOptions.Ratio, downsamplingOptions.HashSalt),
spanWriter: spanWriter,
metrics: *writeMetrics,
}
}

// WriteSpan calls WriteSpan on wrapped span writer.
func (ds *DownsamplingWriter) WriteSpan(span *model.Span) error {
if !ds.shouldDownsample(span) {
if !ds.sampler.ShouldSample(span) {
// Drops spans when hashVal falls beyond computed threshold.
ds.metrics.SpansDropped.Inc(1)
return nil
Expand All @@ -100,20 +79,52 @@ func (ds *DownsamplingWriter) WriteSpan(span *model.Span) error {
return ds.spanWriter.WriteSpan(span)
}

func (ds *DownsamplingWriter) shouldDownsample(span *model.Span) bool {
hasherInstance := ds.hasherPool.Get().(*hasher)
// Currently MarshalTo will only return err if size of traceIDBytes is smaller than 16
// Since we force traceIDBytes to be size of 16 metrics is not necessary here.
_, _ = span.TraceID.MarshalTo(hasherInstance.buffer[ds.lengthOfSalt:])
hashVal := hasherInstance.hashBytes()
ds.hasherPool.Put(hasherInstance)
return hashVal <= ds.threshold
}

// hashBytes returns the uint64 hash value of byte slice.
func (h *hasher) hashBytes() uint64 {
h.hash.Reset()
// Currently fnv.Write() implementation doesn't throw any error so metric is not necessary here.
_, _ = h.hash.Write(h.buffer)
return h.hash.Sum64()
}

// Sampler decides if we should sample a span
type Sampler struct {
hasherPool *sync.Pool
lengthOfSalt int
threshold uint64
}

// NewSampler creates SamplingExecutor
func NewSampler(ratio float64, hashSalt string) Sampler {
threshold := uint64(ratio * float64(math.MaxUint64))
if hashSalt == "" {
hashSalt = defaultHashSalt
}
hashSaltBytes := []byte(hashSalt)
pool := &sync.Pool{
New: func() interface{} {
buffer := make([]byte, len(hashSaltBytes)+traceIDByteSize)
copy(buffer, hashSaltBytes)
return &hasher{
hash: fnv.New64a(),
buffer: buffer,
}
},
}
return Sampler{
threshold: threshold,
hasherPool: pool,
lengthOfSalt: len(hashSaltBytes),
}
}

// ShouldSample decides if a span should be sampled
func (s *Sampler) ShouldSample(span *model.Span) bool {
hasherInstance := s.hasherPool.Get().(*hasher)
// Currently MarshalTo will only return err if size of traceIDBytes is smaller than 16
// Since we force traceIDBytes to be size of 16 metrics is not necessary here.
_, _ = span.TraceID.MarshalTo(hasherInstance.buffer[s.lengthOfSalt:])
hashVal := hasherInstance.hashBytes()
s.hasherPool.Put(hasherInstance)
return hashVal <= s.threshold
}
8 changes: 4 additions & 4 deletions storage/spanstore/downsampling_writer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func BenchmarkDownSamplingWriter_HashBytes(b *testing.B) {
}
b.ResetTimer()
b.ReportAllocs()
h := c.hasherPool.Get().(*hasher)
h := c.sampler.hasherPool.Get().(*hasher)
for it := 0; it < b.N; it++ {
h.hashBytes()
}
c.hasherPool.Put(h)
c.sampler.hasherPool.Put(h)
}

func BenchmarkDownsamplingWriter_RandomHash(b *testing.B) {
Expand All @@ -76,7 +76,7 @@ func BenchmarkDownsamplingWriter_RandomHash(b *testing.B) {
MetricsFactory: metrics.NullFactory,
}
c := NewDownsamplingWriter(&noopWriteSpanStore{}, downsamplingOptions)
h := c.hasherPool.Get().(*hasher)
h := c.sampler.hasherPool.Get().(*hasher)
for it := 0; it < b.N; it++ {
countSmallerThanRatio = 0
for i := 0; i < numberActions; i++ {
Expand All @@ -96,5 +96,5 @@ func BenchmarkDownsamplingWriter_RandomHash(b *testing.B) {
}
fmt.Printf("Random hash ratio %f should be close to 0.5, inspect the implementation of hashBytes if not\n", math.Abs(float64(countSmallerThanRatio)/float64(numberActions)))
}
c.hasherPool.Put(h)
c.sampler.hasherPool.Put(h)
}
4 changes: 2 additions & 2 deletions storage/spanstore/downsampling_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func TestDownSamplingWriter_hashBytes(t *testing.T) {
MetricsFactory: nil,
}
c := NewDownsamplingWriter(&noopWriteSpanStore{}, downsamplingOptions)
h := c.hasherPool.Get().(*hasher)
h := c.sampler.hasherPool.Get().(*hasher)
assert.Equal(t, h.hashBytes(), h.hashBytes())
c.hasherPool.Put(h)
c.sampler.hasherPool.Put(h)
trace := model.TraceID{
Low: uint64(0),
High: uint64(1),
Expand Down

0 comments on commit 6891422

Please sign in to comment.