Skip to content

Commit 6bf5570

Browse files
author
tyagiparth
authored
Add mechanism to limit max hydrate concurrency. Closes #12
1 parent cdf6f8e commit 6bf5570

8 files changed

+194
-93
lines changed

plugin/concurrency.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package plugin
2+
3+
import (
4+
"log"
5+
"sync"
6+
)
7+
8+
// if no max concurrency is specified in the plugin, use this value
9+
const defaultMaxConcurrency = 1000
10+
11+
// if no max call concurrency is specified for a hydrate function, use this value
12+
const defaultMaxConcurrencyPerCall = 500
13+
14+
// ConcurrencyManager :: struct which ensures hydrate funcitons stay within concurrency limits
15+
type ConcurrencyManager struct {
16+
mut sync.Mutex
17+
// the maximun number of all hydrate calls which can run concurrently
18+
maxConcurrency int
19+
// the maximum concurrency for a single hydrate call
20+
// (this may be overridden by the HydrateConfig for the call)
21+
defaultMaxConcurrencyPerCall int
22+
// total number of hydrate calls in progress
23+
callsInProgress int
24+
// map of the number of instances of each call in progress
25+
callMap map[string]int
26+
}
27+
28+
func newConcurrencyManager(t *Table) *ConcurrencyManager {
29+
// if plugin does not define max concurrency, use default
30+
max := defaultMaxConcurrency
31+
// if hydrate calls do not define max concurrency, use default
32+
maxPerCall := defaultMaxConcurrencyPerCall
33+
if config := t.Plugin.DefaultHydrateConfig; config != nil {
34+
if config.MaxConcurrency != 0 {
35+
max = config.MaxConcurrency
36+
}
37+
if config.DefaultMaxConcurrencyPerCall != 0 {
38+
maxPerCall = config.DefaultMaxConcurrencyPerCall
39+
} else if max < maxPerCall {
40+
// if the default call concurrency is greater than the toal max concurrency, clamp to total
41+
maxPerCall = max
42+
}
43+
}
44+
return &ConcurrencyManager{
45+
maxConcurrency: max,
46+
defaultMaxConcurrencyPerCall: maxPerCall,
47+
callMap: make(map[string]int),
48+
}
49+
}
50+
51+
// StartIfAllowed :: check whether the named hydrate call is permitted to start
52+
// based on the number of running instances of that call, and the total calls in progress
53+
func (c *ConcurrencyManager) StartIfAllowed(name string, maxCallConcurrency int) (res bool) {
54+
c.mut.Lock()
55+
defer c.mut.Unlock()
56+
57+
// is the total call limit exceeded?
58+
if c.callsInProgress == c.maxConcurrency {
59+
return false
60+
}
61+
62+
// if there is no config or empty config, the maxCallConcurrency will be 0
63+
// - use defaultMaxConcurrencyPerCall set on the concurrencyManager
64+
if maxCallConcurrency == 0 {
65+
maxCallConcurrency = c.defaultMaxConcurrencyPerCall
66+
}
67+
68+
// how many concurrent executions of this function ar in progress right now?
69+
currentExecutions := c.callMap[name]
70+
71+
// if we at the call limit return
72+
if currentExecutions == maxCallConcurrency {
73+
return false
74+
}
75+
76+
// to get here we are allowed to execute - increment the call counters
77+
c.callMap[name] = currentExecutions + 1
78+
c.callsInProgress++
79+
return true
80+
}
81+
82+
// Finished :: decrement the counter for the named function
83+
func (c *ConcurrencyManager) Finished(name string) {
84+
defer func() {
85+
if r := recover(); r != nil {
86+
log.Printf("[WARN] %v", r)
87+
}
88+
}()
89+
c.mut.Lock()
90+
defer c.mut.Unlock()
91+
c.callMap[name]--
92+
c.callsInProgress--
93+
}

plugin/hydrate.go

+50-17
Original file line numberDiff line numberDiff line change
@@ -6,50 +6,83 @@ import (
66
"github.com/turbot/go-kit/helpers"
77
)
88

9+
// HydrateData :: the input data passed to every hydrate function
910
type HydrateData struct {
1011
Item interface{}
11-
Params map[string]string
1212
HydrateResults map[string]interface{}
1313
}
1414

15-
// perform shallow clone
16-
func (h *HydrateData) Clone() *HydrateData {
17-
return &HydrateData{
18-
Item: h.Item,
19-
Params: h.Params,
20-
HydrateResults: h.HydrateResults,
21-
}
22-
}
23-
24-
// HydrateFunc is a function which retrieves some or all row data for a single row item.
15+
// HydrateFunc :: a function which retrieves some or all row data for a single row item.
2516
type HydrateFunc func(context.Context, *QueryData, *HydrateData) (interface{}, error)
2617

2718
// HydrateDependencies :: define the hydrate function dependencies - other hydrate functions which must be run first
19+
// Deprecated: used HydrateConfig
2820
type HydrateDependencies struct {
2921
Func HydrateFunc
3022
Depends []HydrateFunc
3123
}
3224

25+
// HydrateConfig :: define the hydrate function configurations, Name, Maximum number of concurrent calls to be allowed, dependencies
26+
type HydrateConfig struct {
27+
Func HydrateFunc
28+
MaxConcurrency int
29+
// ConcurrencyMapKey ConcurrencyMapKeyFunc
30+
// ShouldRetryError ErrorPredicate
31+
// ShouldIgnoreError ErrorPredicate
32+
Depends []HydrateFunc
33+
}
34+
35+
// DefaultHydrateConfig :: plugin levelk config to define default hydrate concurrency
36+
// - used if no HydrateConfig is specified for a specific call
37+
type DefaultHydrateConfig struct {
38+
// max number of ALL hydrate calls in progress
39+
MaxConcurrency int
40+
DefaultMaxConcurrencyPerCall int
41+
}
42+
43+
// HydrateCall :: struct encapsulating a hydrate call, its config and dependencies
3344
type HydrateCall struct {
3445
Func HydrateFunc
3546
// the dependencies expressed using function name
3647
Depends []string
48+
Config *HydrateConfig
3749
}
3850

39-
func newHydrateCall(hydrateFunc HydrateFunc, dependencies []HydrateFunc) *HydrateCall {
40-
res := &HydrateCall{Func: hydrateFunc}
41-
for _, f := range dependencies {
51+
func newHydrateCall(hydrateFunc HydrateFunc, config *HydrateConfig) *HydrateCall {
52+
res := &HydrateCall{Func: hydrateFunc, Config: config}
53+
for _, f := range config.Depends {
4254
res.Depends = append(res.Depends, helpers.GetFunctionName(f))
4355
}
4456
return res
4557
}
4658

47-
// CanStart :: can this hydrate call - check whether all dependency hydrate functions have been completed
48-
func (h HydrateCall) CanStart(rowData *RowData) bool {
59+
// CanStart :: return whether this hydrate call can execute
60+
// - check whether all dependency hydrate functions have been completed
61+
// - check whether the concurrency limits would be exceeded
62+
63+
func (h HydrateCall) CanStart(rowData *RowData, name string, concurrencyManager *ConcurrencyManager) bool {
4964
for _, dep := range h.Depends {
5065
if !helpers.StringSliceContains(rowData.getHydrateKeys(), dep) {
5166
return false
5267
}
5368
}
54-
return true
69+
// ask the concurrency manager whether the call can start
70+
// NOTE: if the call is allowed to start, the concurrency manager ASSUMES THE CALL WILL START
71+
// and increments the counters
72+
// it may seem more logical to do this in the Start() function below, but we need to check and increment the counters
73+
// within the same mutex lock to ensure another call does not start between checking and starting
74+
return concurrencyManager.StartIfAllowed(name, h.Config.MaxConcurrency)
75+
}
76+
77+
// Start :: start a hydrate call
78+
func (h *HydrateCall) Start(ctx context.Context, r *RowData, hydrateFuncName string, concurrencyManager *ConcurrencyManager) {
79+
// tell the roewdata to wait for this call to complete
80+
r.wg.Add(1)
81+
82+
// call callHydrate async, ignoring return values
83+
go func() {
84+
r.callHydrate(ctx, r.queryData, h.Func, hydrateFuncName)
85+
// decrement number of hydrate functions running
86+
concurrencyManager.Finished(hydrateFuncName)
87+
}()
5588
}

plugin/plugin.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Plugin struct {
2121
TableMap map[string]*Table
2222
DefaultTransform *transform.ColumnTransforms
2323
DefaultGetConfig *GetConfig
24+
DefaultHydrateConfig *DefaultHydrateConfig
2425
// every table must implement these columns
2526
RequiredColumns []*Column
2627
}
@@ -96,6 +97,7 @@ func (p *Plugin) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_E
9697
ctx := context.WithValue(context.Background(), ContextKeyLogger, p.Logger)
9798
log.Printf("[TRACE] calling fetchItems, table: %s\n", table.Name)
9899

100+
// asyncronously fetch items
99101
table.fetchItems(ctx, d)
100102

101103
log.Println("[TRACE] after fetchItems")

plugin/query_data.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plugin
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67

78
"github.com/turbot/go-kit/helpers"
@@ -29,10 +30,11 @@ type QueryData struct {
2930
QueryContext *pb.QueryContext
3031

3132
// internal
32-
hydrateCalls []*HydrateCall
33-
rowDataChan chan *RowData
34-
errorChan chan error
35-
stream pb.WrapperPlugin_ExecuteServer
33+
hydrateCalls []*HydrateCall
34+
concurrencyManager *ConcurrencyManager
35+
rowDataChan chan *RowData
36+
errorChan chan error
37+
stream pb.WrapperPlugin_ExecuteServer
3638
// wait group used to syncronise parent-child list fetches - each child hydrate function increments this wait group
3739
listWg sync.WaitGroup
3840
}
@@ -56,6 +58,7 @@ func newQueryData(queryContext *pb.QueryContext, table *Table, stream pb.Wrapper
5658
ensureColumns(queryContext, table)
5759

5860
d.hydrateCalls = table.requiredHydrateCalls(queryContext.Columns, d.FetchType)
61+
d.concurrencyManager = newConcurrencyManager(table)
5962
return d
6063
}
6164

@@ -232,6 +235,9 @@ func (d *QueryData) buildRows(ctx context.Context) chan *pb.Row {
232235
// execute necessary hydrate calls to populate row data
233236
func (d *QueryData) buildRow(ctx context.Context, rowData *RowData, rowChan chan *pb.Row, wg *sync.WaitGroup) {
234237
defer func() {
238+
if r := recover(); r != nil {
239+
d.streamError(ToError(r))
240+
}
235241
wg.Done()
236242
}()
237243

@@ -268,3 +274,13 @@ func (d *QueryData) singleEqualsQual(column string) (*pb.Qual, bool) {
268274
}
269275
return nil, false
270276
}
277+
278+
// remove once go-kit version 0.2.0 is released
279+
// ToError :: if supplied value is already an error, return it, otherwise format it as an error
280+
func ToError(val interface{}) error {
281+
if e, ok := val.(error); ok {
282+
return e
283+
} else {
284+
return fmt.Errorf("%v", val)
285+
}
286+
}

plugin/required_hydrate_calls.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package plugin
22

33
import (
4-
"github.com/turbot/go-kit/helpers"
54
"log"
5+
6+
"github.com/turbot/go-kit/helpers"
67
)
78

89
// helper class to build list of required hydrate calls
@@ -30,12 +31,18 @@ func (c requiredHydrateCallBuilder) Add(hydrateFunc HydrateFunc) {
3031

3132
log.Printf("[TRACE] adding hydration function '%s' to hydrationMap\n", hydrateName)
3233

33-
// get any dependencies for this hydrate function
34-
dependencies := c.table.getHydrateDependencies(hydrateName)
35-
c.requiredHydrateCalls[hydrateName] = newHydrateCall(hydrateFunc, dependencies)
34+
// get the config for this hydrate function
35+
config := c.table.getHydrateConfig(hydrateName)
36+
37+
// get any dependencies for this hydrate function. if no hydrate dependencies are specified in the hydrate config, check the deprecated "HydrateDependencies" property
38+
if config.Depends == nil {
39+
config.Depends = c.table.getHydrateDependencies(hydrateName)
40+
}
41+
42+
c.requiredHydrateCalls[hydrateName] = newHydrateCall(hydrateFunc, config)
3643

3744
// now add dependencies (we have already checked for circular dependencies so recursion is fine
38-
for _, dep := range dependencies {
45+
for _, dep := range config.Depends {
3946
c.Add(dep)
4047
}
4148
}

plugin/row.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,18 @@ func (r *RowData) getRow(ctx context.Context) (*pb.Row, error) {
5252
// - these populate the row with data entries corresponding to the hydrate function nameSP_LOG=TRACE
5353
// keep looping round hydrate functions until they are all started
5454

55-
// make a map of started hydrate calls
55+
// make a map of started hydrate calls for this row - this is used the determine which calls have not started yet
5656
var callsStarted = map[string]bool{}
5757

5858
for {
5959
var allStarted = true
6060
for _, call := range r.queryData.hydrateCalls {
6161
hydrateFuncName := helpers.GetFunctionName(call.Func)
6262
if !callsStarted[hydrateFuncName] {
63-
64-
if call.CanStart(r) {
65-
r.wg.Add(1)
66-
// call callHydrate async, ignoring return values
67-
go r.callHydrate(ctx, r.queryData, call.Func, hydrateFuncName)
63+
if call.CanStart(r, hydrateFuncName, r.queryData.concurrencyManager) {
64+
// execute the hydrate call asynchronously
65+
call.Start(ctx, r, hydrateFuncName, r.queryData.concurrencyManager)
6866
callsStarted[hydrateFuncName] = true
69-
7067
} else {
7168
allStarted = false
7269
}
@@ -159,6 +156,7 @@ func (r *RowData) callHydrate(ctx context.Context, d *QueryData, hydrateFunc Hyd
159156
}
160157

161158
logging.LogTime(hydrateKey + " end")
159+
162160
// NOTE: also return the error - is this is being called by as 'get' call we can act on the error immediately
163161
return hydrateData, err
164162
}

plugin/table.go

+12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type Table struct {
2222
Plugin *Plugin
2323
// definitions of dependencies between hydrate functions
2424
HydrateDependencies []HydrateDependencies
25+
HydrateConfig []HydrateConfig
2526
}
2627

2728
type GetConfig struct {
@@ -91,3 +92,14 @@ func (t *Table) getHydrateDependencies(hydrateFuncName string) []HydrateFunc {
9192
}
9293
return []HydrateFunc{}
9394
}
95+
96+
func (t *Table) getHydrateConfig(hydrateFuncName string) *HydrateConfig {
97+
// if a hydrate config is defined see whether this call exists in it
98+
for _, d := range t.HydrateConfig {
99+
if helpers.GetFunctionName(d.Func) == hydrateFuncName {
100+
return &d
101+
}
102+
}
103+
// fallback to return an empty hydrate config
104+
return &HydrateConfig{}
105+
}

0 commit comments

Comments
 (0)