Skip to content

Commit d7ee6ba

Browse files
authored
Add new Hydrate function wrapper Memoize to replace WithCache. Replace Mutexes with RWMutexes and update locking to use RLock where possible. Closes #498. Closes #499
1 parent e765026 commit d7ee6ba

13 files changed

+296
-233
lines changed

connection/connection_cache.go

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (c *ConnectionCache) SetWithTTL(ctx context.Context, key string, value inte
3333
err := c.cache.Set(ctx,
3434
key,
3535
value,
36+
// if ttl is zero there is no expiration
3637
store.WithExpiration(ttl),
3738
// put connection name in tags
3839
store.WithTags([]string{c.connectionName}),

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ require (
3636
github.com/eko/gocache/v3 v3.1.2
3737
github.com/fsnotify/fsnotify v1.6.0
3838
github.com/hashicorp/go-getter v1.6.2
39+
golang.org/x/exp v0.0.0-20221110155412-d0897a79cd37
3940
golang.org/x/sync v0.1.0
4041
)
4142

@@ -93,7 +94,6 @@ require (
9394
go.opencensus.io v0.22.4 // indirect
9495
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
9596
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
96-
golang.org/x/exp v0.0.0-20221110155412-d0897a79cd37 // indirect
9797
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
9898
golang.org/x/mod v0.6.0 // indirect
9999
golang.org/x/net v0.2.0 // indirect

grpc/proto/plugin.pb.go

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugin/concurrency.go

+52-28
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,23 @@ import (
99
DefaultConcurrencyConfig sets the default maximum number of concurrent [HydrateFunc] calls.
1010
1111
Limit total concurrent hydrate calls:
12-
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
13-
TotalMaxConcurrency: 500,
14-
}
12+
13+
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
14+
TotalMaxConcurrency: 500,
15+
}
1516
1617
Limit concurrent hydrate calls to any single HydrateFunc which does not have a [HydrateConfig]:
17-
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
18-
DefaultMaxConcurrency: 100,
19-
}
20-
18+
19+
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
20+
DefaultMaxConcurrency: 100,
21+
}
22+
2123
Do both:
22-
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
23-
TotalMaxConcurrency: 500,
24-
DefaultMaxConcurrency: 200,
25-
}
24+
25+
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
26+
TotalMaxConcurrency: 500,
27+
DefaultMaxConcurrency: 200,
28+
}
2629
2730
Plugin examples:
2831
- [hackernews]
@@ -31,14 +34,14 @@ Plugin examples:
3134
*/
3235
type DefaultConcurrencyConfig struct {
3336
// sets how many HydrateFunc calls can run concurrently in total
34-
TotalMaxConcurrency int
37+
TotalMaxConcurrency int
3538
// sets the default for how many calls to each HydrateFunc can run concurrently
3639
DefaultMaxConcurrency int
3740
}
3841

3942
// concurrencyManager struct ensures that hydrate functions stay within concurrency limits
4043
type concurrencyManager struct {
41-
mut sync.Mutex
44+
mut sync.RWMutex
4245
// the maximum number of all hydrate calls which can run concurrently
4346
maxConcurrency int
4447
// the maximum concurrency for a single hydrate call
@@ -80,25 +83,25 @@ func newConcurrencyManager(t *Table) *concurrencyManager {
8083
// StartIfAllowed checks whether the named hydrate call is permitted to start
8184
// based on the number of running instances of that call, and the total calls in progress
8285
func (c *concurrencyManager) StartIfAllowed(name string, maxCallConcurrency int) (res bool) {
83-
c.mut.Lock()
84-
defer c.mut.Unlock()
86+
// acquire a Read lock
87+
c.mut.RLock()
88+
// how many concurrent executions of this function are in progress right now?
89+
currentExecutions := c.callMap[name]
90+
// ensure we unlock
91+
c.mut.RUnlock()
8592

86-
// is the total call limit exceeded?
87-
if c.maxConcurrency > 0 && c.callsInProgress == c.maxConcurrency {
93+
if !c.canStart(currentExecutions, maxCallConcurrency) {
8894
return false
8995
}
9096

91-
// if there is no config or empty config, the maxCallConcurrency will be 0
92-
// - use defaultMaxConcurrencyPerCall set on the concurrencyManager
93-
if maxCallConcurrency == 0 {
94-
maxCallConcurrency = c.defaultMaxConcurrencyPerCall
95-
}
96-
97-
// how many concurrent executions of this function are in progress right now?
98-
currentExecutions := c.callMap[name]
97+
// upgrade the mutex to a Write lock
98+
c.mut.Lock()
99+
// ensure we unlock
100+
defer c.mut.Unlock()
99101

100-
// if we at the call limit return
101-
if maxCallConcurrency > 0 && currentExecutions == maxCallConcurrency {
102+
// check again in case another thread grabbed the Write lock before us
103+
currentExecutions = c.callMap[name]
104+
if !c.canStart(currentExecutions, maxCallConcurrency) {
102105
return false
103106
}
104107

@@ -113,6 +116,26 @@ func (c *concurrencyManager) StartIfAllowed(name string, maxCallConcurrency int)
113116
if c.callsInProgress > c.maxCallsInProgress {
114117
c.maxCallsInProgress = c.callsInProgress
115118
}
119+
120+
return true
121+
}
122+
123+
func (c *concurrencyManager) canStart(currentExecutions int, maxCallConcurrency int) bool {
124+
// is the total call limit exceeded?
125+
if c.maxConcurrency > 0 && c.callsInProgress == c.maxConcurrency {
126+
return false
127+
}
128+
129+
// if there is no config or empty config, the maxCallConcurrency will be 0
130+
// - use defaultMaxConcurrencyPerCall set on the concurrencyManager
131+
if maxCallConcurrency == 0 {
132+
maxCallConcurrency = c.defaultMaxConcurrencyPerCall
133+
}
134+
135+
// if we at the call limit return
136+
if maxCallConcurrency > 0 && currentExecutions == maxCallConcurrency {
137+
return false
138+
}
116139
return true
117140
}
118141

@@ -123,10 +146,11 @@ func (c *concurrencyManager) Finished(name string) {
123146
log.Printf("[WARN] %v", r)
124147
}
125148
}()
149+
// acquire a Write lock
126150
c.mut.Lock()
127-
defer c.mut.Unlock()
128151
c.callMap[name]--
129152
c.callsInProgress--
153+
c.mut.Unlock()
130154
}
131155

132156
// Close executes when the query is complete and dumps out the concurrency stats

plugin/connection_config_schema_test.go

-75
Original file line numberDiff line numberDiff line change
@@ -137,81 +137,6 @@ type extraPropertyWithAnnotation struct {
137137

138138
var testCasesParseConfig = map[string]parseConfigTest{
139139
// cty tag implementation
140-
"struct property cty": {
141-
source: `
142-
name = { name = "foo"}
143-
`,
144-
connectionConfigSchema: &ConnectionConfigSchema{
145-
NewInstance: func() interface{} { return &structPropertyCty{} },
146-
Schema: map[string]*schema.Attribute{
147-
"name": {
148-
Type: schema.TypeMap,
149-
AttrTypes: map[string]*schema.Attribute{
150-
"name": {
151-
Type: schema.TypeString,
152-
},
153-
},
154-
},
155-
},
156-
},
157-
expected: structPropertyCty{
158-
childStructCty{"foo"}},
159-
},
160-
"struct slice property cty": {
161-
source: `
162-
tables = [
163-
{
164-
name = "test1"
165-
columns = [
166-
{
167-
name = "c1"
168-
type = "string"
169-
},
170-
{
171-
name = "c2"
172-
type = "string"
173-
},
174-
]
175-
}
176-
]
177-
`,
178-
connectionConfigSchema: &ConnectionConfigSchema{
179-
NewInstance: func() interface{} { return &structSlicePropertyCty{} },
180-
Schema: map[string]*schema.Attribute{
181-
"tables": {
182-
Type: schema.TypeList,
183-
Elem: &schema.Attribute{
184-
Type: schema.TypeMap,
185-
AttrTypes: map[string]*schema.Attribute{
186-
"name": {
187-
Type: schema.TypeString,
188-
},
189-
"columns": {
190-
Type: schema.TypeList,
191-
Elem: &schema.Attribute{
192-
193-
Type: schema.TypeMap,
194-
AttrTypes: map[string]*schema.Attribute{
195-
"name": {
196-
Type: schema.TypeString,
197-
},
198-
"type": {
199-
Type: schema.TypeString,
200-
},
201-
},
202-
},
203-
},
204-
},
205-
},
206-
},
207-
},
208-
},
209-
expected: structSlicePropertyCty{
210-
Tables: []configTableCty{{
211-
Name: "test1",
212-
Columns: []configColumnCty{{"c1", "string"}, {"c2", "string"}},
213-
}}},
214-
},
215140
"array property cty": {
216141
source: `
217142
regions = ["us-east-1","us-west-2"]

0 commit comments

Comments
 (0)