Skip to content

Commit

Permalink
dual lookup feature
Browse files Browse the repository at this point in the history
Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 committed Jan 22, 2025
1 parent b689a86 commit 3ba24a9
Show file tree
Hide file tree
Showing 10 changed files with 586 additions and 111 deletions.
4 changes: 0 additions & 4 deletions cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func TestBadgerStorage(t *testing.T) {
StorageIntegration: integration.StorageIntegration{
SkipArchiveTest: true,
CleanUp: purge,

// TODO: remove this once badger supports returning spanKind from GetOperations
// Cf https://github.com/jaegertracing/jaeger/issues/1922
GetOperationsMissingSpanKind: true,
},
}
s.e2eInitialize(t, "badger")
Expand Down
15 changes: 12 additions & 3 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/spf13/viper"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
Expand Down Expand Up @@ -46,7 +47,15 @@ var ( // interface comformance checks
// TODO badger could implement archive storage
// _ storage.ArchiveFactory = (*Factory)(nil)

_ storage.SamplingStoreFactory = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
includeDualLookUp = featuregate.GlobalRegistry().MustRegister(
"jaeger.badger.dualLookUp",
featuregate.StageBeta, // enabed by default
featuregate.WithRegisterFromVersion("v2.2.0"),
featuregate.WithRegisterToVersion("v2.5.0"),
featuregate.WithRegisterDescription("Allows reader to look up for traces from old index key"),
featuregate.WithRegisterReferenceURL("https://github.com/jaegertracing/jaeger/pull/6376"),
)
)

// Factory implements storage.Factory for Badger backend.
Expand Down Expand Up @@ -150,7 +159,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.store = store

f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans)
f.cache = badgerStore.NewCacheStore(f.Config.TTL.Spans)

f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName})
Expand All @@ -176,7 +185,7 @@ func initializeDir(path string) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
tr := badgerStore.NewTraceReader(f.store, f.cache, true)
tr := badgerStore.NewTraceReader(f.store, f.cache, true, includeDualLookUp.IsEnabled())
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
}

Expand Down
56 changes: 56 additions & 0 deletions plugin/storage/badger/spanstore/backward_compatibility_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

import (
"context"
"math/rand"
"testing"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// This test is for checking the backward compatibility after changing the index.
// Once dual lookup is completely removed, this test can be removed
func TestBackwardCompatibility(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
startT := time.Now()
tid := startT
cache := NewCacheStore(1 * time.Hour)
reader := NewTraceReader(store, cache, true, true)
writer := NewSpanWriter(store, cache, 1*time.Hour)
oldSpan := model.Span{
TraceID: model.TraceID{
Low: 0,
High: 1,
},
SpanID: model.SpanID(rand.Uint64()),
OperationName: "operation-1",
Process: &model.Process{
ServiceName: "service",
},
StartTime: tid,
Duration: time.Duration(time.Duration(1) * time.Millisecond),
}
err := writer.writeSpan(&oldSpan, true)
require.NoError(t, err)
traces, err := reader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
ServiceName: "service",
OperationName: "operation-1",
StartTimeMin: startT,
StartTimeMax: startT.Add(time.Duration(time.Millisecond * 10)),
})
require.NoError(t, err)
assert.Len(t, traces, 1)
assert.Len(t, traces[0].Spans, 1)
assert.Equal(t, oldSpan.TraceID, traces[0].Spans[0].TraceID)
assert.Equal(t, oldSpan.SpanID, traces[0].Spans[0].SpanID)
})
}
96 changes: 55 additions & 41 deletions plugin/storage/badger/spanstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,37 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v4"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// CacheStore saves expensive calculations from the K/V store
type CacheStore struct {
// Given the small amount of data these will store, we use the same structure as the memory store
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
services map[string]uint64
operations map[string]map[string]uint64
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
services map[string]uint64
// This map is for the hierarchy: service name, kind and operation name.
// Each service contains the span kinds, and then operation names belonging to that kind.
// This structure will look like:
/*
"service1":{
SpanKind.unspecified: {
"operation1": uint64
}
}
*/
// The uint64 value is the expiry time of operation
operations map[string]map[model.SpanKind]map[string]uint64

store *badger.DB
ttl time.Duration
ttl time.Duration
}

// NewCacheStore returns initialized CacheStore for badger use
func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore {
func NewCacheStore(ttl time.Duration) *CacheStore {
cs := &CacheStore{
services: make(map[string]uint64),
operations: make(map[string]map[string]uint64),
operations: make(map[string]map[model.SpanKind]map[string]uint64),
ttl: ttl,
store: db,
}
return cs
}
Expand All @@ -48,67 +56,73 @@ func (c *CacheStore) AddService(service string, keyTTL uint64) {
}

// AddOperation adds the cache with operation names with most updated expiration time
func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) {
func (c *CacheStore) AddOperation(service, operation string, kind model.SpanKind, keyTTL uint64) {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[string]uint64)
c.operations[service] = make(map[model.SpanKind]map[string]uint64)
}
if v, found := c.operations[service][operation]; found {
if _, found := c.operations[service][kind]; !found {
c.operations[service][kind] = make(map[string]uint64)
}
if v, found := c.operations[service][kind][operation]; found {
if v > keyTTL {
return
}
}
c.operations[service][operation] = keyTTL
c.operations[service][kind][operation] = keyTTL
}

// Update caches the results of service and service + operation indexes and maintains their TTL
func (c *CacheStore) Update(service, operation string, expireTime uint64) {
func (c *CacheStore) Update(service, operation string, kind model.SpanKind, expireTime uint64) {
c.cacheLock.Lock()

c.services[service] = expireTime
if _, ok := c.operations[service]; !ok {
c.operations[service] = make(map[string]uint64)
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[model.SpanKind]map[string]uint64)
}
if _, found := c.operations[service][kind]; !found {
c.operations[service][kind] = make(map[string]uint64)
}
c.operations[service][operation] = expireTime
c.operations[service][kind][operation] = expireTime
c.cacheLock.Unlock()
}

// GetOperations returns all operations for a specific service & spanKind traced by Jaeger
func (c *CacheStore) GetOperations(service string) ([]spanstore.Operation, error) {
operations := make([]string, 0, len(c.services))
func (c *CacheStore) GetOperations(service string, kind string) ([]spanstore.Operation, error) {
operations := make([]spanstore.Operation, 0, len(c.services))
//nolint: gosec // G115
t := uint64(time.Now().Unix())
currentTime := uint64(time.Now().Unix())
c.cacheLock.Lock()
defer c.cacheLock.Unlock()

if v, ok := c.services[service]; ok {
if v < t {
if expiryTimeOfService, ok := c.services[service]; ok {
if expiryTimeOfService < currentTime {
// Expired, remove
delete(c.services, service)
delete(c.operations, service)
return []spanstore.Operation{}, nil // empty slice rather than nil
}
for o, e := range c.operations[service] {
if e > t {
operations = append(operations, o)
} else {
delete(c.operations[service], o)
for sKind := range c.operations[service] {
if kind != "" && kind != string(sKind) {
continue
}
for o, expiryTimeOfOperation := range c.operations[service][sKind] {
if expiryTimeOfOperation > currentTime {
op := spanstore.Operation{Name: o, SpanKind: string(sKind)}
operations = append(operations, op)
} else {
delete(c.operations[service][sKind], o)
}
sort.Slice(operations, func(i, j int) bool {
if operations[i].SpanKind == operations[j].SpanKind {
return operations[i].Name < operations[j].Name
}
return operations[i].SpanKind < operations[j].SpanKind
})
}
}
}

sort.Strings(operations)

// TODO: https://github.com/jaegertracing/jaeger/issues/1922
// - return the operations with actual spanKind
result := make([]spanstore.Operation, 0, len(operations))
for _, op := range operations {
result = append(result, spanstore.Operation{
Name: op,
})
}
return result, nil
return operations, nil
}

// GetServices returns all services traced by Jaeger
Expand Down
Loading

0 comments on commit 3ba24a9

Please sign in to comment.