Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query cache should handle subsets when determining cache hit/miss #224

Merged
merged 8 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions cache/index_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
)

// IndexBucket contains index items for all cache results for a given table and qual set
Expand All @@ -22,9 +23,9 @@ func (b *IndexBucket) Append(item *IndexItem) *IndexBucket {
}

// Get finds an index item which satisfies all columns
func (b *IndexBucket) Get(columns []string, limit int64) *IndexItem {
func (b *IndexBucket) Get(qualMap map[string]*proto.Quals, columns []string, limit int64) *IndexItem {
for _, item := range b.Items {
if item.SatisfiesColumns(columns) && item.SatisfiesLimit(limit) {
if item.SatisfiesQuals(qualMap) && item.SatisfiesColumns(columns) && item.SatisfiesLimit(limit) {
return item
}
}
Expand All @@ -37,13 +38,15 @@ type IndexItem struct {
Columns []string
Key string
Limit int64
Quals map[string]*proto.Quals
}

func NewIndexItem(columns []string, key string, limit int64) *IndexItem {
func NewIndexItem(columns []string, key string, limit int64, quals map[string]*proto.Quals) *IndexItem {
return &IndexItem{
Columns: columns,
Key: key,
Limit: limit,
Quals: quals,
}
}

Expand Down Expand Up @@ -78,3 +81,35 @@ func (i IndexItem) SatisfiesLimit(limit int64) bool {
return res

}

// SatisfiesQuals
// does this index item satisfy the check quals
// all data returned by check quals is returned by index quals
// i.e. check quals must be a 'subset' of index quals
// eg
// our quals [], check quals [id="1"] -> SATISFIED
// our quals [id="1"], check quals [id="1"] -> SATISFIED
// our quals [id="1"], check quals [id="1", foo=2] -> SATISFIED
// our quals [id="1", foo=2], check quals [id="1"] -> NOT SATISFIED
func (i IndexItem) SatisfiesQuals(checkQualMap map[string]*proto.Quals) bool {
log.Printf("[TRACE] SatisfiesQuals")
for col, indexQuals := range i.Quals {
log.Printf("[TRACE] col %s", col)
// if we have quals the check quals do not, we DO NOT satisfy
checkQuals, ok := checkQualMap[col]
var isSubset bool
if ok {
log.Printf("[TRACE] SatisfiesQuals index item has quals for %s which check quals also have - check if our quals for this colummn are a subset of the check quals", col)
log.Printf("[TRACE] indexQuals %+v, checkQuals %+v", indexQuals, checkQuals)
// isSubset means all data returned by check quals is returned by index quals
isSubset = checkQuals.IsASubsetOf(indexQuals)
} else {
log.Printf("[TRACE] SatisfiesQuals index item has qual for %s which check quals do not - NOT SATISFIED")
}
log.Printf("[TRACE] get check qual %v, isSubset %v", ok, isSubset)
if !ok || !isSubset {
return false
}
}
return true
}
4 changes: 3 additions & 1 deletion cache/pending_index_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func newPendingIndexBucket() *pendingIndexBucket {
// GetItemWhichSatisfiesColumnsAndLimit finds an index item which satisfies all columns
// used to find an IndexItem to satisfy a cache Get request
func (b *pendingIndexBucket) GetItemWhichSatisfiesColumnsAndLimit(columns []string, limit int64) *pendingIndexItem {
log.Printf("[TRACE] found pending index item to satisfy columns %v and limit %d", columns, limit)
for _, item := range b.Items {

if item.SatisfiesColumns(columns) && item.SatisfiesLimit(limit) {
log.Printf("[TRACE] found pending index item to satisfy columns %s, limit %d", strings.Join(columns, ","), limit)
return item
Expand Down Expand Up @@ -77,7 +79,7 @@ func (p *pendingIndexItem) Wait() {

func NewPendingIndexItem(columns []string, key string, limit int64) *pendingIndexItem {
res := &pendingIndexItem{
item: NewIndexItem(columns, key, limit),
item: NewIndexItem(columns, key, limit, nil),
lock: new(sync.WaitGroup),
}
// increment wait group - indicate this item is pending
Expand Down
80 changes: 42 additions & 38 deletions cache/query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (c *QueryCache) Set(table string, qualMap map[string]*proto.Quals, columns
// (we need to do this even if the cache set fails)
c.pendingItemComplete(table, qualMap, columns, limit)
}()
cacheQualMap := c.buildCacheQualMap(table, qualMap)

// if any data was returned, extract the columns from the first row
if len(result.Rows) > 0 {
Expand All @@ -83,24 +84,22 @@ func (c *QueryCache) Set(table string, qualMap map[string]*proto.Quals, columns
// write to the result cache
// set the insertion time
result.InsertionTime = time.Now()
resultKey := c.buildResultKey(table, qualMap, columns)
resultKey := c.buildResultKey(table, cacheQualMap, columns)
c.cache.SetWithTTL(resultKey, result, 1, ttl)

// now update the index
// get the index bucket for this table and quals
indexBucketKey := c.buildIndexKey(c.connectionName, table, qualMap)
indexBucketKey := c.buildIndexKey(c.connectionName, table)
indexBucket, ok := c.getIndexBucket(indexBucketKey)

log.Printf("[TRACE] index key %s, result key %s", indexBucketKey, resultKey)

if ok {
indexBucket.Append(&IndexItem{columns, resultKey, limit})
} else {
indexItem := NewIndexItem(columns, resultKey, limit, cacheQualMap)
if !ok {
// create new index bucket
indexBucket = newIndexBucket().Append(NewIndexItem(columns, resultKey, limit))
indexBucket = newIndexBucket()
}
indexBucket.Append(indexItem)

if res := c.cache.SetWithTTL(indexBucketKey, indexBucket, 1, ttl); !res {
log.Printf("[TRACE] Set failed")
log.Printf("[WARN] cache Set failed")
return res
}

Expand All @@ -112,43 +111,61 @@ func (c *QueryCache) Set(table string, qualMap map[string]*proto.Quals, columns

// CancelPendingItem cancels a pending item - called when an execute call fails for any reason
func (c *QueryCache) CancelPendingItem(table string, qualMap map[string]*proto.Quals, columns []string, limit int64) {
log.Printf("[Trace] QueryCache CancelPendingItem %s", table)
log.Printf("[TRACE] QueryCache CancelPendingItem table: %s", table)
// clear the corresponding pending item
c.pendingItemComplete(table, qualMap, columns, limit)
}

func (c *QueryCache) Get(ctx context.Context, table string, qualMap map[string]*proto.Quals, columns []string, limit, ttlSeconds int64) *QueryCacheResult {
// get the index bucket for this table and quals
// - this contains cache keys for all cache entries for specified table and quals
indexBucketKey := c.buildIndexKey(c.connectionName, table, qualMap)
indexBucketKey := c.buildIndexKey(c.connectionName, table)

log.Printf("[TRACE] QueryCache Get - indexBucketKey %s, quals", indexBucketKey)

log.Printf("[TRACE] QueryCache Get - indexBucketKey %s", indexBucketKey)
// build a map containing only the quals which we use for building a cache key (i.e. key column quals)
cacheQualMap := c.buildCacheQualMap(table, qualMap)

// do we have a cached result?
res := c.getCachedResult(indexBucketKey, columns, limit, ttlSeconds)
res := c.getCachedResult(indexBucketKey, table, cacheQualMap, columns, limit, ttlSeconds)
if res != nil {
log.Printf("[INFO] CACHE HIT")
// cache hit!
return res
}

// there was no cached result - is there data fetch in progress?
if pendingItem := c.getPendingResultItem(indexBucketKey, table, qualMap, columns, limit); pendingItem != nil {
log.Printf("[INFO] found pending item - waiting for it")
if pendingItem := c.getPendingResultItem(indexBucketKey, table, cacheQualMap, columns, limit); pendingItem != nil {
log.Printf("[TRACE] found pending item - waiting for it")
// so there is a pending result, wait for it
return c.waitForPendingItem(ctx, pendingItem, indexBucketKey, table, qualMap, columns, limit, ttlSeconds)
return c.waitForPendingItem(ctx, pendingItem, indexBucketKey, table, cacheQualMap, columns, limit, ttlSeconds)
}

log.Printf("[INFO] CACHE MISS")
// cache miss
return nil
}

func (c *QueryCache) buildCacheQualMap(table string, qualMap map[string]*proto.Quals) map[string]*proto.Quals {
shouldIncludeQual := c.getShouldIncludeQualInKey(table)
cacheQualMap := make(map[string]*proto.Quals)
for col, quals := range qualMap {
log.Printf("[TRACE] buildCacheQualMap col %s, quals %+v", col, quals)
if shouldIncludeQual(col) {
log.Printf("[TRACE] INCLUDING COLUMN")
cacheQualMap[col] = quals
} else {
log.Printf("[TRACE] EXCLUDING COLUMN")
}
}
return cacheQualMap
}

func (c *QueryCache) Clear() {
c.cache.Clear()
}

func (c *QueryCache) getCachedResult(indexBucketKey string, columns []string, limit int64, ttlSeconds int64) *QueryCacheResult {
func (c *QueryCache) getCachedResult(indexBucketKey, table string, qualMap map[string]*proto.Quals, columns []string, limit int64, ttlSeconds int64) *QueryCacheResult {
log.Printf("[TRACE] QueryCache getCachedResult - index bucket key: %s\n", indexBucketKey)
indexBucket, ok := c.getIndexBucket(indexBucketKey)
if !ok {
Expand All @@ -157,8 +174,8 @@ func (c *QueryCache) getCachedResult(indexBucketKey string, columns []string, li
return nil
}

// now check whether we have a cache entry that covers the required columns - check the index
indexItem := indexBucket.Get(columns, limit)
// now check whether we have a cache entry that covers the required quals and columns - check the index
indexItem := indexBucket.Get(qualMap, columns, limit)
if indexItem == nil {
limitString := "NONE"
if limit != -1 {
Expand Down Expand Up @@ -204,11 +221,10 @@ func (c *QueryCache) getResult(resultKey string) (*QueryCacheResult, bool) {
return result.(*QueryCacheResult), true
}

func (c *QueryCache) buildIndexKey(connectionName, table string, qualMap map[string]*proto.Quals) string {
str := c.sanitiseKey(fmt.Sprintf("index__%s%s%s",
func (c *QueryCache) buildIndexKey(connectionName, table string) string {
str := c.sanitiseKey(fmt.Sprintf("index__%s%s",
connectionName,
table,
c.formatQualMapForKey(table, qualMap)))
table))
return str
}

Expand Down Expand Up @@ -238,29 +254,17 @@ func (c *QueryCache) formatQualMapForKey(table string, qualMap map[string]*proto
log.Printf("[TRACE] formatQualMapForKey sorted keys %v\n", keys)

// now construct cache key from ordered quals

// get a predicate function which tells us whether to include a qual
shouldIncludeQualInKey := c.getShouldIncludeQualInKey(table)

for i, key := range keys {
strs[i] = c.formatQualsForKey(qualMap[key], shouldIncludeQualInKey)
}
return strings.Join(strs, "-")
}

func (c *QueryCache) formatQualsForKey(quals *proto.Quals, shouldIncludeQualInKey func(string) bool) string {
var strs []string
for _, q := range quals.Quals {
if shouldIncludeQualInKey(q.FieldName) {
for _, q := range qualMap[key].Quals {
strs = append(strs, fmt.Sprintf("%s-%s-%v", q.FieldName, q.GetStringValue(), grpc.GetQualValue(q.Value)))
}
strs[i] = strings.Join(strs, "-")
}
return strings.Join(strs, "-")
}

// only include key column quals and optional quals
func (c *QueryCache) getShouldIncludeQualInKey(table string) func(string) bool {

// build a list of all key columns
tableSchema, ok := c.PluginSchema[table]
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions cache/query_cache_pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *QueryCache) waitForPendingItem(ctx context.Context, pendingItem *pendin

log.Printf("[TRACE] waitForPendingItem indexBucketKey: %s", indexBucketKey)

transferCompleteChan := make(chan (bool), 1)
transferCompleteChan := make(chan bool, 1)
go func() {
pendingItem.Wait()
close(transferCompleteChan)
Expand All @@ -70,7 +70,7 @@ func (c *QueryCache) waitForPendingItem(ctx context.Context, pendingItem *pendin
log.Printf("[TRACE] waitForPendingItem transfer complete - trying cache again, indexBucketKey: %s", indexBucketKey)

// now try to read from the cache again
res = c.getCachedResult(indexBucketKey, columns, limit, ttlSeconds)
res = c.getCachedResult(indexBucketKey, table, qualMap, columns, limit, ttlSeconds)
// if the data is still not in the cache, create a pending item
if res == nil {
log.Printf("[TRACE] waitForPendingItem item still not in the cache - add pending item, indexBucketKey: %s", indexBucketKey)
Expand Down Expand Up @@ -110,7 +110,7 @@ func (c *QueryCache) addPendingResult(indexBucketKey, table string, qualMap map[

// unlock pending result items from the map
func (c *QueryCache) pendingItemComplete(table string, qualMap map[string]*proto.Quals, columns []string, limit int64) {
indexBucketKey := c.buildIndexKey(c.connectionName, table, qualMap)
indexBucketKey := c.buildIndexKey(c.connectionName, table)

log.Printf("[TRACE] pendingItemComplete indexBucketKey %s, columns %v, limit %d", indexBucketKey, columns, limit)
defer log.Printf("[TRACE] pendingItemComplete done")
Expand Down
5 changes: 3 additions & 2 deletions grpc/proto/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading