Skip to content

Commit 0e3ffb1

Browse files
committed
Matrix queries (e.g. multi-region) should take matrix quals into account for get calls. #78
1 parent d91df05 commit 0e3ffb1

File tree

1 file changed

+34
-6
lines changed

1 file changed

+34
-6
lines changed

plugin/table_fetch.go

+34-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,28 @@ const (
2323
fetchTypeGet = "get"
2424
)
2525

26+
/*
27+
multiregion fetch
28+
Each plugin table can optionally define a function `GetMatrixItem`.
29+
30+
This returns a list of maps, each of which contains the parameters required
31+
to do get/list for a given region (or whatever partitioning is relevant to the plugin)
32+
33+
The plugin would typically get this information from the connection config
34+
35+
If a matrix is returned by the plugin, we execute Get/List calls for each matrix item (e.g. each region)
36+
37+
NOTE: if the quals include the matrix property (or properties),w e check whether each matrix
38+
item meets the quals and if not, do not execute for that item
39+
40+
For example, for the query
41+
select vpc_id, region from aws_vpc where region = 'us-east-1'
42+
we would only execute a List function for the matrix item { region: "us-east-1" },
43+
even if other were defined in the connection config
44+
45+
When executing for each matrix item, the matrix item is put into the context, available for use by the get/list call
46+
*/
47+
2648
// call either 'get' or 'list'.
2749
func (t *Table) fetchItems(ctx context.Context, queryData *QueryData) error {
2850
// if the query contains a single 'equals' constrains for all key columns, then call the 'get' function
@@ -161,7 +183,7 @@ func (t *Table) doGet(ctx context.Context, queryData *QueryData, hydrateItem int
161183
func (t *Table) getForEach(ctx context.Context, queryData *QueryData, rd *RowData) (interface{}, error) {
162184
getCall := t.SafeGet()
163185

164-
log.Printf("[DEBUG] getForEach, matrixItem list: %v\n", queryData.Matrix)
186+
log.Printf("[TRACE] getForEach, matrixItem list: %v\n", queryData.Matrix)
165187

166188
var wg sync.WaitGroup
167189
errorChan := make(chan error, len(queryData.Matrix))
@@ -176,6 +198,12 @@ func (t *Table) getForEach(ctx context.Context, queryData *QueryData, rd *RowDat
176198
var results []*resultWithMetadata
177199

178200
for _, matrixItem := range queryData.Matrix {
201+
// check whether there is a single equals qual for each matrix item property and if so, check whether
202+
// the matrix item property values satisfy the conditions
203+
if !t.matrixItemMeetsQuals(matrixItem, queryData) {
204+
log.Printf("[TRACE] getForEach: matrix item item does not meet quals, %v, %v\n", queryData.equalsQuals, matrixItem)
205+
continue
206+
}
179207
// increment our own wait group
180208
wg.Add(1)
181209

@@ -321,12 +349,12 @@ func (t *Table) doListForQualValues(ctx context.Context, queryData *QueryData, k
321349

322350
func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall HydrateFunc) {
323351
if len(queryData.Matrix) == 0 {
324-
log.Printf("[DEBUG] No matrix item")
352+
log.Printf("[TRACE] doList: no matrix item")
325353
if _, err := listCall(ctx, queryData, &HydrateData{}); err != nil {
326354
queryData.streamError(err)
327355
}
328356
} else if len(queryData.Matrix) == 1 {
329-
log.Printf("[DEBUG] running list for single matrixItem: %v", queryData.Matrix[0])
357+
log.Printf("[TRACE] running list for single matrixItem: %v", queryData.Matrix[0])
330358
// create a context with the matrixItem
331359
fetchContext := context.WithValue(ctx, context_key.MatrixItem, queryData.Matrix[0])
332360
if _, err := listCall(fetchContext, queryData, &HydrateData{}); err != nil {
@@ -340,14 +368,14 @@ func (t *Table) doList(ctx context.Context, queryData *QueryData, listCall Hydra
340368
// ListForEach :: execute the provided list call for each of a set of matrixItem
341369
// enables multi-partition fetching
342370
func (t *Table) listForEach(ctx context.Context, queryData *QueryData, listCall HydrateFunc) {
343-
log.Printf("[DEBUG] listForEach: %v\n", queryData.Matrix)
371+
log.Printf("[TRACE] listForEach: %v\n", queryData.Matrix)
344372
var wg sync.WaitGroup
345373
for _, matrixItem := range queryData.Matrix {
346374

347375
// check whether there is a single equals qual for each matrix item property and if so, check whether
348376
// the matrix item property values satisfy the conditions
349377
if !t.matrixItemMeetsQuals(matrixItem, queryData) {
350-
log.Printf("[INFO] matrix item item does not meet quals, %v, %v\n", queryData.equalsQuals, matrixItem)
378+
log.Printf("[INFO] listForEach: matrix item item does not meet quals, %v, %v\n", queryData.equalsQuals, matrixItem)
351379
continue
352380
}
353381

@@ -414,7 +442,7 @@ func (t *Table) executeLegacyGetCall(ctx context.Context, queryData *QueryData)
414442
queryData.streamError(err)
415443
return err
416444
}
417-
t.Plugin.Logger.Debug("executeLegacyGetCall", "hydrateInput", hydrateInput)
445+
t.Plugin.Logger.Trace("executeLegacyGetCall", "hydrateInput", hydrateInput)
418446
// there may be more than one hydrate item - loop over them
419447
for _, hydrateItem := range hydrateInput {
420448
t.Plugin.Logger.Debug("hydrateItem", "hydrateItem", hydrateItem)

0 commit comments

Comments
 (0)