diff --git a/cqproto/grpc.go b/cqproto/grpc.go index 5afb63d1..0f23bdab 100644 --- a/cqproto/grpc.go +++ b/cqproto/grpc.go @@ -67,6 +67,7 @@ func (g GRPCClient) FetchResources(ctx context.Context, request *FetchResourcesR res, err := g.client.FetchResources(ctx, &internal.FetchResources_Request{ Resources: request.Resources, PartialFetchingEnabled: request.PartialFetchingEnabled, + ParallelFetchingLimit: request.ParallelFetchingLimit, }) if err != nil { return nil, err @@ -156,7 +157,7 @@ func (g *GRPCServer) ConfigureProvider(ctx context.Context, request *internal.Co func (g *GRPCServer) FetchResources(request *internal.FetchResources_Request, server internal.Provider_FetchResourcesServer) error { return g.Impl.FetchResources( server.Context(), - &FetchResourcesRequest{Resources: request.GetResources(), PartialFetchingEnabled: request.PartialFetchingEnabled}, + &FetchResourcesRequest{Resources: request.GetResources(), PartialFetchingEnabled: request.PartialFetchingEnabled, ParallelFetchingLimit: request.ParallelFetchingLimit}, &GRPCFetchResourcesServer{server: server}, ) } diff --git a/cqproto/internal/plugin.pb.go b/cqproto/internal/plugin.pb.go index 7a71291b..474e6a90 100644 --- a/cqproto/internal/plugin.pb.go +++ b/cqproto/internal/plugin.pb.go @@ -1,8 +1,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 -// protoc v3.15.7 -// source: internal/plugin.proto +// protoc-gen-go v1.27.1 +// protoc v3.18.1 +// source: plugin.proto package internal @@ -104,11 +104,11 @@ func (x ColumnType) String() string { } func (ColumnType) Descriptor() protoreflect.EnumDescriptor { - return file_internal_plugin_proto_enumTypes[0].Descriptor() + return file_plugin_proto_enumTypes[0].Descriptor() } func (ColumnType) Type() protoreflect.EnumType { - return &file_internal_plugin_proto_enumTypes[0] + return &file_plugin_proto_enumTypes[0] } func (x ColumnType) Number() protoreflect.EnumNumber { @@ -117,7 +117,7 @@ func (x ColumnType) Number() protoreflect.EnumNumber { // Deprecated: Use ColumnType.Descriptor instead. func (ColumnType) EnumDescriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{0} + return file_plugin_proto_rawDescGZIP(), []int{0} } type ConnectionType int32 @@ -147,11 +147,11 @@ func (x ConnectionType) String() string { } func (ConnectionType) Descriptor() protoreflect.EnumDescriptor { - return file_internal_plugin_proto_enumTypes[1].Descriptor() + return file_plugin_proto_enumTypes[1].Descriptor() } func (ConnectionType) Type() protoreflect.EnumType { - return &file_internal_plugin_proto_enumTypes[1] + return &file_plugin_proto_enumTypes[1] } func (x ConnectionType) Number() protoreflect.EnumNumber { @@ -160,7 +160,7 @@ func (x ConnectionType) Number() protoreflect.EnumNumber { // Deprecated: Use ConnectionType.Descriptor instead. func (ConnectionType) EnumDescriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{1} + return file_plugin_proto_rawDescGZIP(), []int{1} } // Execution status of the resource fetch execution @@ -204,11 +204,11 @@ func (x ResourceFetchSummary_Status) String() string { } func (ResourceFetchSummary_Status) Descriptor() protoreflect.EnumDescriptor { - return file_internal_plugin_proto_enumTypes[2].Descriptor() + return file_plugin_proto_enumTypes[2].Descriptor() } func (ResourceFetchSummary_Status) Type() protoreflect.EnumType { - return &file_internal_plugin_proto_enumTypes[2] + return &file_plugin_proto_enumTypes[2] } func (x ResourceFetchSummary_Status) Number() protoreflect.EnumNumber { @@ -217,7 +217,7 @@ func (x ResourceFetchSummary_Status) Number() protoreflect.EnumNumber { // Deprecated: Use ResourceFetchSummary_Status.Descriptor instead. func (ResourceFetchSummary_Status) EnumDescriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{2, 0} + return file_plugin_proto_rawDescGZIP(), []int{2, 0} } type Diagnostic_Type int32 @@ -259,11 +259,11 @@ func (x Diagnostic_Type) String() string { } func (Diagnostic_Type) Descriptor() protoreflect.EnumDescriptor { - return file_internal_plugin_proto_enumTypes[3].Descriptor() + return file_plugin_proto_enumTypes[3].Descriptor() } func (Diagnostic_Type) Type() protoreflect.EnumType { - return &file_internal_plugin_proto_enumTypes[3] + return &file_plugin_proto_enumTypes[3] } func (x Diagnostic_Type) Number() protoreflect.EnumNumber { @@ -272,7 +272,7 @@ func (x Diagnostic_Type) Number() protoreflect.EnumNumber { // Deprecated: Use Diagnostic_Type.Descriptor instead. func (Diagnostic_Type) EnumDescriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{4, 0} + return file_plugin_proto_rawDescGZIP(), []int{4, 0} } type Diagnostic_Severity int32 @@ -308,11 +308,11 @@ func (x Diagnostic_Severity) String() string { } func (Diagnostic_Severity) Descriptor() protoreflect.EnumDescriptor { - return file_internal_plugin_proto_enumTypes[4].Descriptor() + return file_plugin_proto_enumTypes[4].Descriptor() } func (Diagnostic_Severity) Type() protoreflect.EnumType { - return &file_internal_plugin_proto_enumTypes[4] + return &file_plugin_proto_enumTypes[4] } func (x Diagnostic_Severity) Number() protoreflect.EnumNumber { @@ -321,7 +321,7 @@ func (x Diagnostic_Severity) Number() protoreflect.EnumNumber { // Deprecated: Use Diagnostic_Severity.Descriptor instead. func (Diagnostic_Severity) EnumDescriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{4, 1} + return file_plugin_proto_rawDescGZIP(), []int{4, 1} } type ConfigureProvider struct { @@ -333,7 +333,7 @@ type ConfigureProvider struct { func (x *ConfigureProvider) Reset() { *x = ConfigureProvider{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[0] + mi := &file_plugin_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -346,7 +346,7 @@ func (x *ConfigureProvider) String() string { func (*ConfigureProvider) ProtoMessage() {} func (x *ConfigureProvider) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[0] + mi := &file_plugin_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -359,7 +359,7 @@ func (x *ConfigureProvider) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigureProvider.ProtoReflect.Descriptor instead. func (*ConfigureProvider) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{0} + return file_plugin_proto_rawDescGZIP(), []int{0} } type FetchResources struct { @@ -371,7 +371,7 @@ type FetchResources struct { func (x *FetchResources) Reset() { *x = FetchResources{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[1] + mi := &file_plugin_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -384,7 +384,7 @@ func (x *FetchResources) String() string { func (*FetchResources) ProtoMessage() {} func (x *FetchResources) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[1] + mi := &file_plugin_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -397,7 +397,7 @@ func (x *FetchResources) ProtoReflect() protoreflect.Message { // Deprecated: Use FetchResources.ProtoReflect.Descriptor instead. func (*FetchResources) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{1} + return file_plugin_proto_rawDescGZIP(), []int{1} } // ResourceFetchSummary includes a summarized report of a fetched resource, such as total amount of resources collected, @@ -418,7 +418,7 @@ type ResourceFetchSummary struct { func (x *ResourceFetchSummary) Reset() { *x = ResourceFetchSummary{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[2] + mi := &file_plugin_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -431,7 +431,7 @@ func (x *ResourceFetchSummary) String() string { func (*ResourceFetchSummary) ProtoMessage() {} func (x *ResourceFetchSummary) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[2] + mi := &file_plugin_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -444,7 +444,7 @@ func (x *ResourceFetchSummary) ProtoReflect() protoreflect.Message { // Deprecated: Use ResourceFetchSummary.ProtoReflect.Descriptor instead. func (*ResourceFetchSummary) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{2} + return file_plugin_proto_rawDescGZIP(), []int{2} } func (x *ResourceFetchSummary) GetStatus() ResourceFetchSummary_Status { @@ -486,7 +486,7 @@ type PartialFetchFailedResource struct { func (x *PartialFetchFailedResource) Reset() { *x = PartialFetchFailedResource{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[3] + mi := &file_plugin_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -499,7 +499,7 @@ func (x *PartialFetchFailedResource) String() string { func (*PartialFetchFailedResource) ProtoMessage() {} func (x *PartialFetchFailedResource) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[3] + mi := &file_plugin_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -512,7 +512,7 @@ func (x *PartialFetchFailedResource) ProtoReflect() protoreflect.Message { // Deprecated: Use PartialFetchFailedResource.ProtoReflect.Descriptor instead. func (*PartialFetchFailedResource) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{3} + return file_plugin_proto_rawDescGZIP(), []int{3} } func (x *PartialFetchFailedResource) GetTableName() string { @@ -558,7 +558,7 @@ type Diagnostic struct { func (x *Diagnostic) Reset() { *x = Diagnostic{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[4] + mi := &file_plugin_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -571,7 +571,7 @@ func (x *Diagnostic) String() string { func (*Diagnostic) ProtoMessage() {} func (x *Diagnostic) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[4] + mi := &file_plugin_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -584,7 +584,7 @@ func (x *Diagnostic) ProtoReflect() protoreflect.Message { // Deprecated: Use Diagnostic.ProtoReflect.Descriptor instead. func (*Diagnostic) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{4} + return file_plugin_proto_rawDescGZIP(), []int{4} } func (x *Diagnostic) GetType() Diagnostic_Type { @@ -631,7 +631,7 @@ type GetProviderSchema struct { func (x *GetProviderSchema) Reset() { *x = GetProviderSchema{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[5] + mi := &file_plugin_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -644,7 +644,7 @@ func (x *GetProviderSchema) String() string { func (*GetProviderSchema) ProtoMessage() {} func (x *GetProviderSchema) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[5] + mi := &file_plugin_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -657,7 +657,7 @@ func (x *GetProviderSchema) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderSchema.ProtoReflect.Descriptor instead. func (*GetProviderSchema) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{5} + return file_plugin_proto_rawDescGZIP(), []int{5} } type GetProviderConfig struct { @@ -669,7 +669,7 @@ type GetProviderConfig struct { func (x *GetProviderConfig) Reset() { *x = GetProviderConfig{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[6] + mi := &file_plugin_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -682,7 +682,7 @@ func (x *GetProviderConfig) String() string { func (*GetProviderConfig) ProtoMessage() {} func (x *GetProviderConfig) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[6] + mi := &file_plugin_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -695,7 +695,7 @@ func (x *GetProviderConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderConfig.ProtoReflect.Descriptor instead. func (*GetProviderConfig) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{6} + return file_plugin_proto_rawDescGZIP(), []int{6} } // Table is the definition of how a table is defined in a provider @@ -714,7 +714,7 @@ type Table struct { func (x *Table) Reset() { *x = Table{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[7] + mi := &file_plugin_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -727,7 +727,7 @@ func (x *Table) String() string { func (*Table) ProtoMessage() {} func (x *Table) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[7] + mi := &file_plugin_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -740,7 +740,7 @@ func (x *Table) ProtoReflect() protoreflect.Message { // Deprecated: Use Table.ProtoReflect.Descriptor instead. func (*Table) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{7} + return file_plugin_proto_rawDescGZIP(), []int{7} } func (x *Table) GetName() string { @@ -789,7 +789,7 @@ type TableCreationOptions struct { func (x *TableCreationOptions) Reset() { *x = TableCreationOptions{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[8] + mi := &file_plugin_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -802,7 +802,7 @@ func (x *TableCreationOptions) String() string { func (*TableCreationOptions) ProtoMessage() {} func (x *TableCreationOptions) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[8] + mi := &file_plugin_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -815,7 +815,7 @@ func (x *TableCreationOptions) ProtoReflect() protoreflect.Message { // Deprecated: Use TableCreationOptions.ProtoReflect.Descriptor instead. func (*TableCreationOptions) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{8} + return file_plugin_proto_rawDescGZIP(), []int{8} } func (x *TableCreationOptions) GetPrimaryKeys() []string { @@ -838,7 +838,7 @@ type Column struct { func (x *Column) Reset() { *x = Column{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[9] + mi := &file_plugin_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -851,7 +851,7 @@ func (x *Column) String() string { func (*Column) ProtoMessage() {} func (x *Column) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[9] + mi := &file_plugin_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -864,7 +864,7 @@ func (x *Column) ProtoReflect() protoreflect.Message { // Deprecated: Use Column.ProtoReflect.Descriptor instead. func (*Column) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{9} + return file_plugin_proto_rawDescGZIP(), []int{9} } func (x *Column) GetName() string { @@ -900,7 +900,7 @@ type ConnectionDetails struct { func (x *ConnectionDetails) Reset() { *x = ConnectionDetails{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[10] + mi := &file_plugin_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -913,7 +913,7 @@ func (x *ConnectionDetails) String() string { func (*ConnectionDetails) ProtoMessage() {} func (x *ConnectionDetails) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[10] + mi := &file_plugin_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -926,7 +926,7 @@ func (x *ConnectionDetails) ProtoReflect() protoreflect.Message { // Deprecated: Use ConnectionDetails.ProtoReflect.Descriptor instead. func (*ConnectionDetails) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{10} + return file_plugin_proto_rawDescGZIP(), []int{10} } func (x *ConnectionDetails) GetType() ConnectionType { @@ -963,7 +963,7 @@ type ConfigureProvider_Request struct { func (x *ConfigureProvider_Request) Reset() { *x = ConfigureProvider_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[11] + mi := &file_plugin_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -976,7 +976,7 @@ func (x *ConfigureProvider_Request) String() string { func (*ConfigureProvider_Request) ProtoMessage() {} func (x *ConfigureProvider_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[11] + mi := &file_plugin_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -989,7 +989,7 @@ func (x *ConfigureProvider_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigureProvider_Request.ProtoReflect.Descriptor instead. func (*ConfigureProvider_Request) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{0, 0} + return file_plugin_proto_rawDescGZIP(), []int{0, 0} } func (x *ConfigureProvider_Request) GetCloudqueryVersion() string { @@ -1038,7 +1038,7 @@ type ConfigureProvider_Response struct { func (x *ConfigureProvider_Response) Reset() { *x = ConfigureProvider_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[12] + mi := &file_plugin_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1051,7 +1051,7 @@ func (x *ConfigureProvider_Response) String() string { func (*ConfigureProvider_Response) ProtoMessage() {} func (x *ConfigureProvider_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[12] + mi := &file_plugin_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1064,7 +1064,7 @@ func (x *ConfigureProvider_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use ConfigureProvider_Response.ProtoReflect.Descriptor instead. func (*ConfigureProvider_Response) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{0, 1} + return file_plugin_proto_rawDescGZIP(), []int{0, 1} } func (x *ConfigureProvider_Response) GetError() string { @@ -1083,12 +1083,14 @@ type FetchResources_Request struct { Resources []string `protobuf:"bytes,1,rep,name=resources,proto3" json:"resources,omitempty"` // trigger to enable partial fetching PartialFetchingEnabled bool `protobuf:"varint,2,opt,name=partial_fetching_enabled,json=partialFetchingEnabled,proto3" json:"partial_fetching_enabled,omitempty"` + // if value is more than 0 it enables throttling for concurrent fetch + ParallelFetchingLimit uint64 `protobuf:"varint,3,opt,name=parallel_fetching_limit,json=parallelFetchingLimit,proto3" json:"parallel_fetching_limit,omitempty"` } func (x *FetchResources_Request) Reset() { *x = FetchResources_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[13] + mi := &file_plugin_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1101,7 +1103,7 @@ func (x *FetchResources_Request) String() string { func (*FetchResources_Request) ProtoMessage() {} func (x *FetchResources_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[13] + mi := &file_plugin_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1114,7 +1116,7 @@ func (x *FetchResources_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use FetchResources_Request.ProtoReflect.Descriptor instead. func (*FetchResources_Request) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{1, 0} + return file_plugin_proto_rawDescGZIP(), []int{1, 0} } func (x *FetchResources_Request) GetResources() []string { @@ -1131,6 +1133,13 @@ func (x *FetchResources_Request) GetPartialFetchingEnabled() bool { return false } +func (x *FetchResources_Request) GetParallelFetchingLimit() uint64 { + if x != nil { + return x.ParallelFetchingLimit + } + return 0 +} + type FetchResources_Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1153,7 +1162,7 @@ type FetchResources_Response struct { func (x *FetchResources_Response) Reset() { *x = FetchResources_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[14] + mi := &file_plugin_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1166,7 +1175,7 @@ func (x *FetchResources_Response) String() string { func (*FetchResources_Response) ProtoMessage() {} func (x *FetchResources_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[14] + mi := &file_plugin_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1179,7 +1188,7 @@ func (x *FetchResources_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use FetchResources_Response.ProtoReflect.Descriptor instead. func (*FetchResources_Response) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{1, 1} + return file_plugin_proto_rawDescGZIP(), []int{1, 1} } func (x *FetchResources_Response) GetFinishedResources() map[string]bool { @@ -1233,7 +1242,7 @@ type GetProviderSchema_Request struct { func (x *GetProviderSchema_Request) Reset() { *x = GetProviderSchema_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[16] + mi := &file_plugin_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1246,7 +1255,7 @@ func (x *GetProviderSchema_Request) String() string { func (*GetProviderSchema_Request) ProtoMessage() {} func (x *GetProviderSchema_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[16] + mi := &file_plugin_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1259,7 +1268,7 @@ func (x *GetProviderSchema_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderSchema_Request.ProtoReflect.Descriptor instead. func (*GetProviderSchema_Request) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{5, 0} + return file_plugin_proto_rawDescGZIP(), []int{5, 0} } type GetProviderSchema_Response struct { @@ -1276,7 +1285,7 @@ type GetProviderSchema_Response struct { func (x *GetProviderSchema_Response) Reset() { *x = GetProviderSchema_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[17] + mi := &file_plugin_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1289,7 +1298,7 @@ func (x *GetProviderSchema_Response) String() string { func (*GetProviderSchema_Response) ProtoMessage() {} func (x *GetProviderSchema_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[17] + mi := &file_plugin_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1302,7 +1311,7 @@ func (x *GetProviderSchema_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderSchema_Response.ProtoReflect.Descriptor instead. func (*GetProviderSchema_Response) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{5, 1} + return file_plugin_proto_rawDescGZIP(), []int{5, 1} } func (x *GetProviderSchema_Response) GetName() string { @@ -1342,7 +1351,7 @@ type GetProviderConfig_Request struct { func (x *GetProviderConfig_Request) Reset() { *x = GetProviderConfig_Request{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[20] + mi := &file_plugin_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1355,7 +1364,7 @@ func (x *GetProviderConfig_Request) String() string { func (*GetProviderConfig_Request) ProtoMessage() {} func (x *GetProviderConfig_Request) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[20] + mi := &file_plugin_proto_msgTypes[20] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1368,7 +1377,7 @@ func (x *GetProviderConfig_Request) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderConfig_Request.ProtoReflect.Descriptor instead. func (*GetProviderConfig_Request) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{6, 0} + return file_plugin_proto_rawDescGZIP(), []int{6, 0} } type GetProviderConfig_Response struct { @@ -1384,7 +1393,7 @@ type GetProviderConfig_Response struct { func (x *GetProviderConfig_Response) Reset() { *x = GetProviderConfig_Response{} if protoimpl.UnsafeEnabled { - mi := &file_internal_plugin_proto_msgTypes[21] + mi := &file_plugin_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1397,7 +1406,7 @@ func (x *GetProviderConfig_Response) String() string { func (*GetProviderConfig_Response) ProtoMessage() {} func (x *GetProviderConfig_Response) ProtoReflect() protoreflect.Message { - mi := &file_internal_plugin_proto_msgTypes[21] + mi := &file_plugin_proto_msgTypes[21] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1410,7 +1419,7 @@ func (x *GetProviderConfig_Response) ProtoReflect() protoreflect.Message { // Deprecated: Use GetProviderConfig_Response.ProtoReflect.Descriptor instead. func (*GetProviderConfig_Response) Descriptor() ([]byte, []int) { - return file_internal_plugin_proto_rawDescGZIP(), []int{6, 1} + return file_plugin_proto_rawDescGZIP(), []int{6, 1} } func (x *GetProviderConfig_Response) GetName() string { @@ -1434,36 +1443,39 @@ func (x *GetProviderConfig_Response) GetConfig() []byte { return nil } -var File_internal_plugin_proto protoreflect.FileDescriptor - -var file_internal_plugin_proto_rawDesc = []byte{ - 0x0a, 0x15, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, - 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, - 0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x50, 0x72, 0x6f, 0x76, - 0x69, 0x64, 0x65, 0x72, 0x1a, 0xd2, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, - 0x38, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x52, 0x0a, 0x63, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, - 0x65, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x65, 0x78, 0x74, 0x72, 0x61, - 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x65, 0x78, - 0x74, 0x72, 0x61, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x1a, 0x20, 0x0a, 0x08, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xa4, 0x04, 0x0a, 0x0e, - 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x1a, 0x61, - 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x38, 0x0a, 0x18, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x61, 0x6c, 0x5f, 0x66, 0x65, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x5f, 0x65, 0x6e, 0x61, 0x62, - 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x61, 0x6c, 0x46, 0x65, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, - 0x64, 0x1a, 0xae, 0x03, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x64, +var File_plugin_proto protoreflect.FileDescriptor + +var file_plugin_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x75, 0x72, 0x65, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x1a, 0xd2, 0x01, 0x0a, 0x07, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x11, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x71, 0x75, 0x65, 0x72, 0x79, 0x56, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x74, + 0x61, 0x69, 0x6c, 0x73, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x24, 0x0a, 0x0d, 0x64, 0x69, 0x73, 0x61, + 0x62, 0x6c, 0x65, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x0d, 0x64, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x20, + 0x0a, 0x0b, 0x65, 0x78, 0x74, 0x72, 0x61, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x65, 0x78, 0x74, 0x72, 0x61, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, + 0x1a, 0x20, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x22, 0xdd, 0x04, 0x0a, 0x0e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x1a, 0x99, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, + 0x38, 0x0a, 0x18, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x66, 0x65, 0x74, 0x63, 0x68, + 0x69, 0x6e, 0x67, 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x16, 0x70, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x46, 0x65, 0x74, 0x63, 0x68, 0x69, + 0x6e, 0x67, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x36, 0x0a, 0x17, 0x70, 0x61, 0x72, + 0x61, 0x6c, 0x6c, 0x65, 0x6c, 0x5f, 0x66, 0x65, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x5f, 0x6c, + 0x69, 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, 0x70, 0x61, 0x72, 0x61, + 0x6c, 0x6c, 0x65, 0x6c, 0x46, 0x65, 0x74, 0x63, 0x68, 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x6d, 0x69, + 0x74, 0x1a, 0xae, 0x03, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x64, 0x0a, 0x12, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x35, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, @@ -1646,20 +1658,20 @@ var file_internal_plugin_proto_rawDesc = []byte{ } var ( - file_internal_plugin_proto_rawDescOnce sync.Once - file_internal_plugin_proto_rawDescData = file_internal_plugin_proto_rawDesc + file_plugin_proto_rawDescOnce sync.Once + file_plugin_proto_rawDescData = file_plugin_proto_rawDesc ) -func file_internal_plugin_proto_rawDescGZIP() []byte { - file_internal_plugin_proto_rawDescOnce.Do(func() { - file_internal_plugin_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_plugin_proto_rawDescData) +func file_plugin_proto_rawDescGZIP() []byte { + file_plugin_proto_rawDescOnce.Do(func() { + file_plugin_proto_rawDescData = protoimpl.X.CompressGZIP(file_plugin_proto_rawDescData) }) - return file_internal_plugin_proto_rawDescData + return file_plugin_proto_rawDescData } -var file_internal_plugin_proto_enumTypes = make([]protoimpl.EnumInfo, 5) -var file_internal_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 22) -var file_internal_plugin_proto_goTypes = []interface{}{ +var file_plugin_proto_enumTypes = make([]protoimpl.EnumInfo, 5) +var file_plugin_proto_msgTypes = make([]protoimpl.MessageInfo, 22) +var file_plugin_proto_goTypes = []interface{}{ (ColumnType)(0), // 0: proto.ColumnType (ConnectionType)(0), // 1: proto.ConnectionType (ResourceFetchSummary_Status)(0), // 2: proto.ResourceFetchSummary.Status @@ -1688,7 +1700,7 @@ var file_internal_plugin_proto_goTypes = []interface{}{ (*GetProviderConfig_Request)(nil), // 25: proto.GetProviderConfig.Request (*GetProviderConfig_Response)(nil), // 26: proto.GetProviderConfig.Response } -var file_internal_plugin_proto_depIdxs = []int32{ +var file_plugin_proto_depIdxs = []int32{ 2, // 0: proto.ResourceFetchSummary.status:type_name -> proto.ResourceFetchSummary.Status 9, // 1: proto.ResourceFetchSummary.diagnostics:type_name -> proto.Diagnostic 3, // 2: proto.Diagnostic.type:type_name -> proto.Diagnostic.Type @@ -1720,13 +1732,13 @@ var file_internal_plugin_proto_depIdxs = []int32{ 0, // [0:16] is the sub-list for field type_name } -func init() { file_internal_plugin_proto_init() } -func file_internal_plugin_proto_init() { - if File_internal_plugin_proto != nil { +func init() { file_plugin_proto_init() } +func file_plugin_proto_init() { + if File_plugin_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_internal_plugin_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConfigureProvider); i { case 0: return &v.state @@ -1738,7 +1750,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FetchResources); i { case 0: return &v.state @@ -1750,7 +1762,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ResourceFetchSummary); i { case 0: return &v.state @@ -1762,7 +1774,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PartialFetchFailedResource); i { case 0: return &v.state @@ -1774,7 +1786,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Diagnostic); i { case 0: return &v.state @@ -1786,7 +1798,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderSchema); i { case 0: return &v.state @@ -1798,7 +1810,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderConfig); i { case 0: return &v.state @@ -1810,7 +1822,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Table); i { case 0: return &v.state @@ -1822,7 +1834,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TableCreationOptions); i { case 0: return &v.state @@ -1834,7 +1846,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Column); i { case 0: return &v.state @@ -1846,7 +1858,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConnectionDetails); i { case 0: return &v.state @@ -1858,7 +1870,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConfigureProvider_Request); i { case 0: return &v.state @@ -1870,7 +1882,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConfigureProvider_Response); i { case 0: return &v.state @@ -1882,7 +1894,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FetchResources_Request); i { case 0: return &v.state @@ -1894,7 +1906,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FetchResources_Response); i { case 0: return &v.state @@ -1906,7 +1918,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderSchema_Request); i { case 0: return &v.state @@ -1918,7 +1930,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderSchema_Response); i { case 0: return &v.state @@ -1930,7 +1942,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderConfig_Request); i { case 0: return &v.state @@ -1942,7 +1954,7 @@ func file_internal_plugin_proto_init() { return nil } } - file_internal_plugin_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { + file_plugin_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetProviderConfig_Response); i { case 0: return &v.state @@ -1955,24 +1967,24 @@ func file_internal_plugin_proto_init() { } } } - file_internal_plugin_proto_msgTypes[7].OneofWrappers = []interface{}{} + file_plugin_proto_msgTypes[7].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_internal_plugin_proto_rawDesc, + RawDescriptor: file_plugin_proto_rawDesc, NumEnums: 5, NumMessages: 22, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_internal_plugin_proto_goTypes, - DependencyIndexes: file_internal_plugin_proto_depIdxs, - EnumInfos: file_internal_plugin_proto_enumTypes, - MessageInfos: file_internal_plugin_proto_msgTypes, + GoTypes: file_plugin_proto_goTypes, + DependencyIndexes: file_plugin_proto_depIdxs, + EnumInfos: file_plugin_proto_enumTypes, + MessageInfos: file_plugin_proto_msgTypes, }.Build() - File_internal_plugin_proto = out.File - file_internal_plugin_proto_rawDesc = nil - file_internal_plugin_proto_goTypes = nil - file_internal_plugin_proto_depIdxs = nil + File_plugin_proto = out.File + file_plugin_proto_rawDesc = nil + file_plugin_proto_goTypes = nil + file_plugin_proto_depIdxs = nil } diff --git a/cqproto/internal/plugin.proto b/cqproto/internal/plugin.proto index 21e74e76..31b6935f 100644 --- a/cqproto/internal/plugin.proto +++ b/cqproto/internal/plugin.proto @@ -39,6 +39,8 @@ message FetchResources { repeated string resources = 1; // trigger to enable partial fetching bool partial_fetching_enabled = 2; + // if value is more than 0 it enables throttling for concurrent fetch + uint64 parallel_fetching_limit = 3; } message Response { // map of resources that have finished fetching diff --git a/cqproto/provider.go b/cqproto/provider.go index a1a31fbd..9b4e06b4 100644 --- a/cqproto/provider.go +++ b/cqproto/provider.go @@ -88,6 +88,9 @@ type FetchResourcesRequest struct { // PartialFetchingEnabled if true enables partial fetching PartialFetchingEnabled bool + + // ParallelFetchingLimit limits parallel resources fetch at a time is more than 0 + ParallelFetchingLimit uint64 } // FetchResourcesStream represents a CloudQuery RPC stream of fetch updates from the provider diff --git a/provider/provider.go b/provider/provider.go index 34ce206f..9213de1a 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/hcl/v2/hclsimple" "github.com/hashicorp/hcl/v2/hclwrite" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) // Config Every provider implements a resources field we only want to extract that in fetch execution @@ -134,7 +135,6 @@ func (p *Provider) ConfigureProvider(_ context.Context, request *cqproto.Configu } func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchResourcesRequest, sender cqproto.FetchResourcesSender) error { - if p.meta == nil { return fmt.Errorf("provider client is not configured, call ConfigureProvider first") } @@ -156,6 +156,12 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes defer conn.Close() + // limiter used to limit the amount of resources fetched concurently + var limiter *semaphore.Weighted + if request.ParallelFetchingLimit > 0 { + limiter = semaphore.NewWeighted(int64(request.ParallelFetchingLimit)) + } + g, gctx := errgroup.WithContext(ctx) finishedResources := make(map[string]bool, len(resources)) l := sync.Mutex{} @@ -173,6 +179,12 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes finishedResources[r] = false l.Unlock() g.Go(func() error { + if limiter != nil { + if err := limiter.Acquire(gctx, 1); err != nil { + return err + } + defer limiter.Release(1) + } resourceCount, err := execData.ResolveTable(gctx, p.meta, nil) l.Lock() finishedResources[r] = true diff --git a/provider/provider_test.go b/provider/provider_test.go index db39b02c..841851d1 100644 --- a/provider/provider_test.go +++ b/provider/provider_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/cloudquery/cq-provider-sdk/provider/schema/mocks" "github.com/golang/mock/gomock" @@ -23,7 +24,6 @@ type ( Name string } testConfig struct{} - testClient struct{} ) @@ -58,6 +58,7 @@ var ( testResolverFunc = func(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan interface{}) error { for i := 0; i < 10; i++ { t := testStruct{} + time.Sleep(50 * time.Millisecond) res <- faker.FakeData(&t) } return nil @@ -134,6 +135,85 @@ var ( }, }, } + + parallelCheckProvider = Provider{ + Name: "parallel", + Config: func() Config { + return &testConfig{} + }, + ResourceMap: map[string]*schema.Table{ + "test": { + Name: "test_resource", + Resolver: testResolverFunc, + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeBigInt, + }, + { + Name: "name", + Type: schema.TypeString, + }, + }, + }, + "test1": { + Name: "test1_resource", + Resolver: testResolverFunc, + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeBigInt, + }, + { + Name: "name", + Type: schema.TypeString, + }, + }, + }, + "test2": { + Name: "test2_resource", + Resolver: testResolverFunc, + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeBigInt, + }, + { + Name: "name", + Type: schema.TypeString, + }, + }, + }, + "test3": { + Name: "test3_resource", + Resolver: testResolverFunc, + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeBigInt, + }, + { + Name: "name", + Type: schema.TypeString, + }, + }, + }, + "test4": { + Name: "test4_resource", + Resolver: testResolverFunc, + Columns: []schema.Column{ + { + Name: "id", + Type: schema.TypeBigInt, + }, + { + Name: "name", + Type: schema.TypeString, + }, + }, + }, + }, + } ) func TestProviderInterpolate(t *testing.T) { @@ -296,3 +376,35 @@ func (f *testResourceSender) Send(r *cqproto.FetchResourcesResponse) error { } return nil } + +func TestProvider_FetchResourcesParallelLimit(t *testing.T) { + parallelCheckProvider.Configure = func(logger hclog.Logger, i interface{}) (schema.ClientMeta, error) { + return testClient{}, nil + } + parallelCheckProvider.Logger = hclog.Default() + resp, err := parallelCheckProvider.ConfigureProvider(context.Background(), &cqproto.ConfigureProviderRequest{ + CloudQueryVersion: "dev", + Connection: cqproto.ConnectionDetails{ + DSN: "postgres://postgres:pass@localhost:5432/postgres?sslmode=disable", + }, + Config: nil, + DisableDelete: true, + ExtraFields: nil, + }) + assert.Equal(t, "", resp.Error) + assert.Nil(t, err) + + // it runs 5 resources at a time. each resource takes ~500ms + start := time.Now() + err = parallelCheckProvider.FetchResources(context.Background(), &cqproto.FetchResourcesRequest{Resources: []string{"*"}}, &testResourceSender{}) + assert.Nil(t, err) + length := time.Since(start) + assert.Less(t, length, 1000*time.Millisecond) + + // it runs 5 resources one by one. each resource takes ~500ms + start = time.Now() + err = parallelCheckProvider.FetchResources(context.Background(), &cqproto.FetchResourcesRequest{Resources: []string{"*"}, ParallelFetchingLimit: 1}, &testResourceSender{}) + assert.Nil(t, err) + length = time.Since(start) + assert.Greater(t, length, 2500*time.Millisecond) +}