diff --git a/helpers/limit/limits.go b/helpers/limit/limits.go new file mode 100644 index 00000000..527d67f9 --- /dev/null +++ b/helpers/limit/limits.go @@ -0,0 +1,34 @@ +package limit + +import ( + "github.com/pbnjay/memory" +) + +const ( + gbInBytes int = 1024 * 1024 * 1024 + goroutinesPerGB float64 = 250000 +) + +func GetMaxGoRoutines() uint64 { + limit := calculateGoRoutines(getMemory()) + ulimit, err := getUlimit() + if err != nil || ulimit == 0 { + return limit + } + if ulimit > limit { + return limit + } + return ulimit +} + +func getMemory() uint64 { + return memory.TotalMemory() +} + +func calculateGoRoutines(totalMemory uint64) uint64 { + if totalMemory == 0 { + // assume we have 2 GB RAM + return uint64(goroutinesPerGB * 2) + } + return uint64(goroutinesPerGB * float64(totalMemory) / float64(gbInBytes)) +} diff --git a/helpers/limits_test.go b/helpers/limit/limits_test.go similarity index 97% rename from helpers/limits_test.go rename to helpers/limit/limits_test.go index be21affa..f14ae701 100644 --- a/helpers/limits_test.go +++ b/helpers/limit/limits_test.go @@ -1,4 +1,4 @@ -package helpers +package limit import ( "testing" diff --git a/helpers/limit/ulimit_unix.go b/helpers/limit/ulimit_unix.go new file mode 100644 index 00000000..441b2872 --- /dev/null +++ b/helpers/limit/ulimit_unix.go @@ -0,0 +1,13 @@ +//go:build darwin || linux + +package limit + +import ( + "syscall" +) + +func getUlimit() (uint64, error) { + var rLimit syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + return rLimit.Max, err +} diff --git a/helpers/limit/ulimit_win.go b/helpers/limit/ulimit_win.go new file mode 100644 index 00000000..619bffcf --- /dev/null +++ b/helpers/limit/ulimit_win.go @@ -0,0 +1,7 @@ +//go:build windows + +package limit + +func getUlimit() (uint64, error) { + return 0, nil +} diff --git a/helpers/limits.go b/helpers/limits.go deleted file mode 100644 index 49d7c221..00000000 --- a/helpers/limits.go +++ /dev/null @@ -1,25 +0,0 @@ -package helpers - -import ( - "github.com/pbnjay/memory" -) - -const GB_IN_BYTES uint64 = 1024 * 1024 * 1024 -const GO_ROUTINES_PER_GB uint64 = 250000 - -func GetMaxGoRoutines() uint64 { - return calculateGoRoutines(getMemory()) -} - -func getMemory() uint64 { - return memory.TotalMemory() -} - -func calculateGoRoutines(totalMemory uint64) uint64 { - if totalMemory == 0 { - // assume we have 2 GB RAM - return GO_ROUTINES_PER_GB * 2 - } - gb := float64(totalMemory) / float64(GB_IN_BYTES) - return uint64(float64(GO_ROUTINES_PER_GB) * gb) -} diff --git a/provider/execution/execution_test.go b/provider/execution/execution_test.go index 4e96200b..884b81a9 100644 --- a/provider/execution/execution_test.go +++ b/provider/execution/execution_test.go @@ -7,7 +7,8 @@ import ( "testing" "time" - "github.com/cloudquery/cq-provider-sdk/helpers" + "github.com/cloudquery/cq-provider-sdk/helpers/limit" + "github.com/cloudquery/cq-provider-sdk/provider/diag" "github.com/cloudquery/cq-provider-sdk/provider/schema" "github.com/cloudquery/cq-provider-sdk/testlog" @@ -462,10 +463,10 @@ func TestTableExecutor_Resolve(t *testing.T) { ErrorExpected: true, ExpectedDiags: []diag.FlatDiag{ { - Err: `error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:58] some error`, + Err: `error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:59] some error`, Resource: "return_wrap_error", Severity: diag.ERROR, - Summary: `failed to resolve table "simple": error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:58] some error`, + Summary: `failed to resolve table "simple": error at github.com/cloudquery/cq-provider-sdk/provider/execution.glob..func4[execution_test.go:59] some error`, Type: diag.RESOLVING, }, }, @@ -528,7 +529,7 @@ func TestTableExecutor_Resolve(t *testing.T) { if tc.SetupStorage != nil { storage = tc.SetupStorage(t) } - limiter := semaphore.NewWeighted(int64(helpers.GetMaxGoRoutines())) + limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines())) exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, tc.ExtraFields, nil, nil, limiter, 10*time.Second) count, diags := exec.Resolve(context.Background(), executionClient) assert.Equal(t, tc.ExpectedResourceCount, count) @@ -647,7 +648,7 @@ func TestTableExecutor_resolveResourceValues(t *testing.T) { if tc.SetupStorage != nil { storage = tc.SetupStorage(t) } - limiter := semaphore.NewWeighted(int64(helpers.GetMaxGoRoutines())) + limiter := semaphore.NewWeighted(int64(limit.GetMaxGoRoutines())) exec := NewTableExecutor(tc.Name, storage, testlog.New(t), tc.Table, nil, nil, nil, limiter, 0) r := schema.NewResourceData(storage.Dialect(), tc.Table, nil, tc.ResourceData, tc.MetaData, exec.executionStart) diff --git a/provider/provider.go b/provider/provider.go index 4864af4b..76be591e 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -9,6 +9,8 @@ import ( "sync" "sync/atomic" + "github.com/cloudquery/cq-provider-sdk/helpers/limit" + "github.com/cloudquery/cq-provider-sdk/database" "github.com/cloudquery/cq-provider-sdk/migration/migrator" "github.com/cloudquery/cq-provider-sdk/provider/diag" @@ -189,7 +191,7 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes var goroutinesSem *semaphore.Weighted maxGoroutines := request.MaxGoroutines if maxGoroutines == 0 { - maxGoroutines = helpers.GetMaxGoRoutines() + maxGoroutines = limit.GetMaxGoRoutines() } goroutinesSem = semaphore.NewWeighted(helpers.Uint64ToInt64(maxGoroutines))