Skip to content

Commit

Permalink
[refactor][storage][badger]Refactored the prefilling of cache to read…
Browse files Browse the repository at this point in the history
…er (#6575)

## Which problem is this PR solving?
Comment:
#6376 (comment)

## Description of the changes
- Cache was directly contacting the db to prefill itself which is not a
good way, now this responsibility is given to reader to read from badger
and fill the cache.

## How was this change tested?
- Unit and e2e tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 authored Jan 21, 2025
1 parent f6c4be1 commit caccdce
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 115 deletions.
4 changes: 2 additions & 2 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.store = store

f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans, true)
f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans)

f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName})
Expand All @@ -176,7 +176,7 @@ func initializeDir(path string) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
tr := badgerStore.NewTraceReader(f.store, f.cache)
tr := badgerStore.NewTraceReader(f.store, f.cache, true)
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
}

Expand Down
82 changes: 20 additions & 62 deletions plugin/storage/badger/spanstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,83 +25,41 @@ type CacheStore struct {
}

// NewCacheStore returns initialized CacheStore for badger use
func NewCacheStore(db *badger.DB, ttl time.Duration, prefill bool) *CacheStore {
func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore {
cs := &CacheStore{
services: make(map[string]uint64),
operations: make(map[string]map[string]uint64),
ttl: ttl,
store: db,
}

if prefill {
cs.populateCaches()
}
return cs
}

func (c *CacheStore) populateCaches() {
// AddService fills the services into the cache with the most updated expiration time
func (c *CacheStore) AddService(service string, keyTTL uint64) {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()

c.loadServices()

for k := range c.services {
c.loadOperations(k)
}
}

func (c *CacheStore) loadServices() {
c.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

serviceKey := []byte{serviceNameIndexKey}

// Seek all the services first
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := it.Item().ExpiresAt()
if v, found := c.services[serviceName]; found {
if v > keyTTL {
continue
}
}
c.services[serviceName] = keyTTL
if v, found := c.services[service]; found {
if v > keyTTL {
return
}
return nil
})
}
c.services[service] = keyTTL
}

func (c *CacheStore) loadOperations(service string) {
c.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

serviceKey := make([]byte, len(service)+1)
serviceKey[0] = operationNameIndexKey
copy(serviceKey[1:], service)

// Seek all the services first
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := it.Item().ExpiresAt()
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[string]uint64)
}

if v, found := c.operations[service][operationName]; found {
if v > keyTTL {
continue
}
}
c.operations[service][operationName] = keyTTL
// AddOperation adds the cache with operation names with most updated expiration time
func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[string]uint64)
}
if v, found := c.operations[service][operation]; found {
if v > keyTTL {
return
}
return nil
})
}
c.operations[service][operation] = keyTTL
}

// Update caches the results of service and service + operation indexes and maintains their TTL
Expand Down
43 changes: 1 addition & 42 deletions plugin/storage/badger/spanstore/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)

/*
Expand All @@ -20,7 +18,7 @@ import (

func TestExpiredItems(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)
cache := NewCacheStore(store, time.Duration(-1*time.Hour))

expireTime := uint64(time.Now().Add(cache.ttl).Unix())

Expand Down Expand Up @@ -55,45 +53,6 @@ func TestExpiredItems(t *testing.T) {
})
}

func TestOldReads(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
timeNow := model.TimeAsEpochMicroseconds(time.Now())
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})

tid := time.Now().Add(1 * time.Minute)

writer := func() {
store.Update(func(txn *badger.Txn) error {
txn.SetEntry(&badger.Entry{
Key: s1Key,
ExpiresAt: uint64(tid.Unix()),
})
txn.SetEntry(&badger.Entry{
Key: s1o1Key,
ExpiresAt: uint64(tid.Unix()),
})
return nil
})
}

cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)
writer()

nuTid := tid.Add(1 * time.Hour)

cache.Update("service1", "operation1", uint64(tid.Unix()))
cache.services["service1"] = uint64(nuTid.Unix())
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())

cache.populateCaches()

// Now make sure we didn't use the older timestamps from the DB
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
})
}

// func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader)) {
func runWithBadger(t *testing.T, test func(store *badger.DB, t *testing.T)) {
opts := badger.DefaultOptions("")
Expand Down
56 changes: 54 additions & 2 deletions plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,18 @@ type executionPlan struct {
}

// NewTraceReader returns a TraceReader with cache
func NewTraceReader(db *badger.DB, c *CacheStore) *TraceReader {
return &TraceReader{
func NewTraceReader(db *badger.DB, c *CacheStore, prefillCache bool) *TraceReader {
reader := &TraceReader{
store: db,
cache: c,
}
if prefillCache {
services := reader.preloadServices()
for _, service := range services {
reader.preloadOperations(service)
}
}
return reader
}

func decodeValue(val []byte, encodeType byte) (*model.Span, error) {
Expand Down Expand Up @@ -612,3 +619,48 @@ func scanRangeFunction(it *badger.Iterator, indexEndValue []byte) bool {
}
return false
}

// preloadServices fills the cache with services after extracting from badger
func (r *TraceReader) preloadServices() []string {
var services []string
r.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

serviceKey := []byte{serviceNameIndexKey}

// Seek all the services first
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := it.Item().ExpiresAt()
services = append(services, serviceName)
r.cache.AddService(serviceName, keyTTL)
}
return nil
})
return services
}

// preloadOperations extract all operations for a specified service
func (r *TraceReader) preloadOperations(service string) {
r.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

serviceKey := make([]byte, len(service)+1)
serviceKey[0] = operationNameIndexKey
copy(serviceKey[1:], service)

// Seek all the services first
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
keyTTL := it.Item().ExpiresAt()
r.cache.AddOperation(service, operationName, keyTTL)
}
return nil
})
}
54 changes: 47 additions & 7 deletions plugin/storage/badger/spanstore/rw_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func TestEncodingTypes(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
cache := NewCacheStore(store, time.Duration(1*time.Hour))
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
rw := NewTraceReader(store, cache)
rw := NewTraceReader(store, cache, true)

sw.encodingType = jsonEncoding
err := sw.WriteSpan(context.Background(), &testSpan)
Expand All @@ -40,7 +40,7 @@ func TestEncodingTypes(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
cache := NewCacheStore(store, time.Duration(1*time.Hour))
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
// rw := NewTraceReader(store, cache)

Expand All @@ -53,9 +53,9 @@ func TestEncodingTypes(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
cache := NewCacheStore(store, time.Duration(1*time.Hour))
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
rw := NewTraceReader(store, cache)
rw := NewTraceReader(store, cache, true)

err := sw.WriteSpan(context.Background(), &testSpan)
require.NoError(t, err)
Expand Down Expand Up @@ -92,9 +92,9 @@ func TestDecodeErrorReturns(t *testing.T) {
func TestDuplicateTraceIDDetection(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
cache := NewCacheStore(store, time.Duration(1*time.Hour))
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
rw := NewTraceReader(store, cache)
rw := NewTraceReader(store, cache, true)
origStartTime := testSpan.StartTime

traceCount := 128
Expand Down Expand Up @@ -189,3 +189,43 @@ func TestMergeJoin(t *testing.T) {
chk.Len(merged, 2)
chk.Equal(uint32(2), binary.BigEndian.Uint32(merged[1]))
}

func TestOldReads(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
timeNow := model.TimeAsEpochMicroseconds(time.Now())
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})

tid := time.Now().Add(1 * time.Minute)

writer := func() {
store.Update(func(txn *badger.Txn) error {
txn.SetEntry(&badger.Entry{
Key: s1Key,
ExpiresAt: uint64(tid.Unix()),
})
txn.SetEntry(&badger.Entry{
Key: s1o1Key,
ExpiresAt: uint64(tid.Unix()),
})
return nil
})
}

cache := NewCacheStore(store, time.Duration(-1*time.Hour))
writer()

nuTid := tid.Add(1 * time.Hour)

cache.Update("service1", "operation1", uint64(tid.Unix()))
cache.services["service1"] = uint64(nuTid.Unix())
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())

// This is equivalent to populate caches of cache
_ = NewTraceReader(store, cache, true)

// Now make sure we didn't use the older timestamps from the DB
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
})
}

0 comments on commit caccdce

Please sign in to comment.