From 3ad1ce6a2f264c847e568420094a720d4e7ca6ba Mon Sep 17 00:00:00 2001 From: Marc-Philippe Fuller Date: Mon, 28 Nov 2022 15:11:53 -0600 Subject: [PATCH] fix: renames GolangRuntime to AppServiceRuntime Signed-off-by: Marc-Philippe Fuller --- internal/app/service.go | 4 +- internal/app/service_test.go | 8 +- internal/app/triggermessageprocessor.go | 2 +- internal/runtime/runtime.go | 186 ++++++++++++------------ internal/runtime/runtime_test.go | 32 ++-- internal/runtime/storeforward.go | 2 +- internal/runtime/storeforward_test.go | 4 +- 7 files changed, 119 insertions(+), 119 deletions(-) diff --git a/internal/app/service.go b/internal/app/service.go index 2884b9efd..cdd64e676 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -84,7 +84,7 @@ type Service struct { config *common.ConfigurationStruct lc logger.LoggingClient usingConfigurablePipeline bool - runtime *runtime.GolangRuntime + runtime *runtime.AppServiceRuntime webserver *webserver.WebServer ctx contextGroup deferredFunctions []bootstrap.Deferred @@ -549,7 +549,7 @@ func (svc *Service) Initialize() error { return fmt.Errorf("unable to parse Service.RequestTimeout configuration as a time duration: %s", err.Error()) } - svc.runtime = runtime.NewGolangRuntime(svc.serviceKey, svc.targetType, svc.dic) + svc.runtime = runtime.NewAppServiceRuntime(svc.serviceKey, svc.targetType, svc.dic) // Bootstrapping is complete, so now need to retrieve the needed objects from the containers. svc.lc = bootstrapContainer.LoggingClientFrom(svc.dic.Get) diff --git a/internal/app/service_test.go b/internal/app/service_test.go index 0f2b9f598..9166f3c9c 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -256,7 +256,7 @@ func TestSetupHTTPTrigger(t *testing.T) { }, } - testRuntime := runtime.NewGolangRuntime("", nil, dic) + testRuntime := runtime.NewAppServiceRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) sdk.runtime = testRuntime @@ -276,7 +276,7 @@ func TestSetupMessageBusTrigger(t *testing.T) { }, }, } - testRuntime := runtime.NewGolangRuntime("", nil, dic) + testRuntime := runtime.NewAppServiceRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) sdk.runtime = testRuntime @@ -305,7 +305,7 @@ func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) { service := Service{ lc: lc, dic: dic, - runtime: runtime.NewGolangRuntime("", nil, dic), + runtime: runtime.NewAppServiceRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, @@ -324,7 +324,7 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) { service := Service{ lc: lc, dic: dic, - runtime: runtime.NewGolangRuntime("", nil, dic), + runtime: runtime.NewAppServiceRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, diff --git a/internal/app/triggermessageprocessor.go b/internal/app/triggermessageprocessor.go index 54e104e3c..936007f77 100644 --- a/internal/app/triggermessageprocessor.go +++ b/internal/app/triggermessageprocessor.go @@ -41,7 +41,7 @@ import ( type simpleTriggerServiceBinding struct { *Service - *runtime.GolangRuntime + *runtime.AppServiceRuntime } func (b *simpleTriggerServiceBinding) SecretProvider() messaging.SecretDataProvider { diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index ce0f244e9..72f251db7 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -66,8 +66,8 @@ func NewFunctionPipeline(id string, topics []string, transforms []interfaces.App return pipeline } -// GolangRuntime represents the golang runtime environment -type GolangRuntime struct { +// AppServiceRuntime represents the golang runtime environment +type AppServiceRuntime struct { TargetType interface{} ServiceKey string pipelines map[string]*interfaces.FunctionPipeline @@ -82,65 +82,65 @@ type MessageError struct { ErrorCode int } -// NewGolangRuntime creates and initializes the GolangRuntime instance -func NewGolangRuntime(serviceKey string, targetType interface{}, dic *di.Container) *GolangRuntime { - gr := &GolangRuntime{ +// NewAppServiceRuntime creates and initializes the AppServiceRuntime instance +func NewAppServiceRuntime(serviceKey string, targetType interface{}, dic *di.Container) *AppServiceRuntime { + asr := &AppServiceRuntime{ ServiceKey: serviceKey, TargetType: targetType, dic: dic, pipelines: make(map[string]*interfaces.FunctionPipeline), } - gr.storeForward.dic = dic - gr.storeForward.runtime = gr - gr.lc = bootstrapContainer.LoggingClientFrom(gr.dic.Get) + asr.storeForward.dic = dic + asr.storeForward.runtime = asr + asr.lc = bootstrapContainer.LoggingClientFrom(asr.dic.Get) - return gr + return asr } // SetDefaultFunctionsPipeline sets the default function pipeline -func (gr *GolangRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.AppFunction) { - pipeline := gr.GetDefaultPipeline() // ensures the default pipeline exists - gr.SetFunctionsPipelineTransforms(pipeline.Id, transforms) +func (asr *AppServiceRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.AppFunction) { + pipeline := asr.GetDefaultPipeline() // ensures the default pipeline exists + asr.SetFunctionsPipelineTransforms(pipeline.Id, transforms) } // SetFunctionsPipelineTransforms sets the transforms for an existing function pipeline. // Non-existent pipelines are ignored -func (gr *GolangRuntime) SetFunctionsPipelineTransforms(id string, transforms []interfaces.AppFunction) { - pipeline := gr.pipelines[id] +func (asr *AppServiceRuntime) SetFunctionsPipelineTransforms(id string, transforms []interfaces.AppFunction) { + pipeline := asr.pipelines[id] if pipeline != nil { - gr.isBusyCopying.Lock() + asr.isBusyCopying.Lock() pipeline.Transforms = transforms pipeline.Hash = calculatePipelineHash(transforms) - gr.isBusyCopying.Unlock() - gr.lc.Infof("Transforms set for `%s` pipeline", id) + asr.isBusyCopying.Unlock() + asr.lc.Infof("Transforms set for `%s` pipeline", id) } else { - gr.lc.Warnf("Unable to set transforms for `%s` pipeline: Pipeline not found", id) + asr.lc.Warnf("Unable to set transforms for `%s` pipeline: Pipeline not found", id) } } // SetFunctionsPipelineTopics sets the topics for an existing function pipeline. // Non-existent pipelines are ignored -func (gr *GolangRuntime) SetFunctionsPipelineTopics(id string, topics []string) { - pipeline := gr.pipelines[id] +func (asr *AppServiceRuntime) SetFunctionsPipelineTopics(id string, topics []string) { + pipeline := asr.pipelines[id] if pipeline != nil { - gr.isBusyCopying.Lock() + asr.isBusyCopying.Lock() pipeline.Topics = topics - gr.isBusyCopying.Unlock() - gr.lc.Infof("Topics set for `%s` pipeline", id) + asr.isBusyCopying.Unlock() + asr.lc.Infof("Topics set for `%s` pipeline", id) } else { - gr.lc.Warnf("Unable to set topica for `%s` pipeline: Pipeline not found", id) + asr.lc.Warnf("Unable to set topica for `%s` pipeline: Pipeline not found", id) } } // ClearAllFunctionsPipelineTransforms clears the transforms for all existing function pipelines. -func (gr *GolangRuntime) ClearAllFunctionsPipelineTransforms() { - gr.isBusyCopying.Lock() - for index := range gr.pipelines { - gr.pipelines[index].Transforms = nil - gr.pipelines[index].Hash = "" +func (asr *AppServiceRuntime) ClearAllFunctionsPipelineTransforms() { + asr.isBusyCopying.Lock() + for index := range asr.pipelines { + asr.pipelines[index].Transforms = nil + asr.pipelines[index].Hash = "" } - gr.isBusyCopying.Unlock() + asr.isBusyCopying.Unlock() } // RemoveAllFunctionPipelines removes all existing function pipelines @@ -158,37 +158,37 @@ func (gr *GolangRuntime) RemoveAllFunctionPipelines() { } // AddFunctionsPipeline is thread safe to set transforms -func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { - _, exists := gr.pipelines[id] +func (asr *AppServiceRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { + _, exists := asr.pipelines[id] if exists { return fmt.Errorf("pipeline with Id='%s' already exists", id) } - _ = gr.addFunctionsPipeline(id, topics, transforms) + _ = asr.addFunctionsPipeline(id, topics, transforms) return nil } -func (gr *GolangRuntime) addFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) *interfaces.FunctionPipeline { +func (asr *AppServiceRuntime) addFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) *interfaces.FunctionPipeline { pipeline := NewFunctionPipeline(id, topics, transforms) - gr.isBusyCopying.Lock() - gr.pipelines[id] = &pipeline - gr.isBusyCopying.Unlock() + asr.isBusyCopying.Lock() + asr.pipelines[id] = &pipeline + asr.isBusyCopying.Unlock() - metricManager := bootstrapContainer.MetricsManagerFrom(gr.dic.Get) - gr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed) - gr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime) - gr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors) + metricManager := bootstrapContainer.MetricsManagerFrom(asr.dic.Get) + asr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed) + asr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime) + asr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors) return &pipeline } -func (gr *GolangRuntime) registerPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string, metric interface{}) { +func (asr *AppServiceRuntime) registerPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string, metric interface{}) { registeredName := strings.Replace(metricName, internal.PipelineIdTxt, pipelineId, 1) err := metricManager.Register(registeredName, metric, map[string]string{"pipeline": pipelineId}) if err != nil { - gr.lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", registeredName, err.Error()) + asr.lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", registeredName, err.Error()) } else { - gr.lc.Infof("%s metric has been registered and will be reported (if enabled)", registeredName) + asr.lc.Infof("%s metric has been registered and will be reported (if enabled)", registeredName) } } @@ -198,19 +198,19 @@ func (gr *GolangRuntime) unregisterPipelineMetric(metricManager bootstrapInterfa } // ProcessMessage sends the contents of the message through the functions pipeline -func (gr *GolangRuntime) ProcessMessage(appContext *appfunction.Context, target interface{}, pipeline *interfaces.FunctionPipeline) *MessageError { +func (asr *AppServiceRuntime) ProcessMessage(appContext *appfunction.Context, target interface{}, pipeline *interfaces.FunctionPipeline) *MessageError { if len(pipeline.Transforms) == 0 { err := fmt.Errorf("no transforms configured for pipleline Id='%s'. Please check log for earlier errors loading pipeline", pipeline.Id) - gr.logError(err, appContext.CorrelationID()) + asr.logError(err, appContext.CorrelationID()) return &MessageError{Err: err, ErrorCode: http.StatusInternalServerError} } appContext.AddValue(interfaces.PIPELINEID, pipeline.Id) - gr.lc.Debugf("Pipeline '%s' processing message %d Transforms", pipeline.Id, len(pipeline.Transforms)) + asr.lc.Debugf("Pipeline '%s' processing message %d Transforms", pipeline.Id, len(pipeline.Transforms)) // Make copy of transform functions to avoid disruption of pipeline when updating the pipeline from registry - gr.isBusyCopying.Lock() + asr.isBusyCopying.Lock() execPipeline := &interfaces.FunctionPipeline{ Id: pipeline.Id, Transforms: make([]interfaces.AppFunction, len(pipeline.Transforms)), @@ -221,47 +221,47 @@ func (gr *GolangRuntime) ProcessMessage(appContext *appfunction.Context, target ProcessingErrors: pipeline.ProcessingErrors, } copy(execPipeline.Transforms, pipeline.Transforms) - gr.isBusyCopying.Unlock() + asr.isBusyCopying.Unlock() - return gr.ExecutePipeline(target, appContext, execPipeline, 0, false) + return asr.ExecutePipeline(target, appContext, execPipeline, 0, false) } // DecodeMessage decode the message wrapped in the MessageEnvelope and return the data to be processed. -func (gr *GolangRuntime) DecodeMessage(appContext *appfunction.Context, envelope types.MessageEnvelope) (interface{}, *MessageError, bool) { +func (asr *AppServiceRuntime) DecodeMessage(appContext *appfunction.Context, envelope types.MessageEnvelope) (interface{}, *MessageError, bool) { // Default Target Type for the function pipeline is an Event DTO. // The Event DTO can be wrapped in an AddEventRequest DTO or just be the un-wrapped Event DTO, // which is handled dynamically below. - if gr.TargetType == nil { - gr.TargetType = &dtos.Event{} + if asr.TargetType == nil { + asr.TargetType = &dtos.Event{} } - if reflect.TypeOf(gr.TargetType).Kind() != reflect.Ptr { + if reflect.TypeOf(asr.TargetType).Kind() != reflect.Ptr { err := errors.New("TargetType must be a pointer, not a value of the target type") - gr.logError(err, envelope.CorrelationID) + asr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusInternalServerError}, false } // Must make a copy of the type so that data isn't retained between calls for custom types - target := reflect.New(reflect.ValueOf(gr.TargetType).Elem().Type()).Interface() + target := reflect.New(reflect.ValueOf(asr.TargetType).Elem().Type()).Interface() switch target.(type) { case *[]byte: - gr.lc.Debug("Expecting raw byte data") + asr.lc.Debug("Expecting raw byte data") target = &envelope.Payload case *dtos.Event: - gr.lc.Debug("Expecting an AddEventRequest or Event DTO") + asr.lc.Debug("Expecting an AddEventRequest or Event DTO") // Dynamically process either AddEventRequest or Event DTO - event, err := gr.processEventPayload(envelope) + event, err := asr.processEventPayload(envelope) if err != nil { err = fmt.Errorf("unable to process payload %s", err.Error()) - gr.logError(err, envelope.CorrelationID) + asr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true } - if gr.lc.LogLevel() == models.DebugLog { - gr.debugLogEvent(event) + if asr.lc.LogLevel() == models.DebugLog { + asr.debugLogEvent(event) } appContext.AddValue(interfaces.DEVICENAME, event.DeviceName) @@ -272,12 +272,12 @@ func (gr *GolangRuntime) DecodeMessage(appContext *appfunction.Context, envelope default: customTypeName := di.TypeInstanceToName(target) - gr.lc.Debugf("Expecting a custom type of %s", customTypeName) + asr.lc.Debugf("Expecting a custom type of %s", customTypeName) // Expecting a custom type so just unmarshal into the target type. - if err := gr.unmarshalPayload(envelope, target); err != nil { + if err := asr.unmarshalPayload(envelope, target); err != nil { err = fmt.Errorf("unable to process custom object received of type '%s': %s", customTypeName, err.Error()) - gr.logError(err, envelope.CorrelationID) + asr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true } } @@ -293,7 +293,7 @@ func (gr *GolangRuntime) DecodeMessage(appContext *appfunction.Context, envelope return target, nil, false } -func (gr *GolangRuntime) ExecutePipeline( +func (asr *AppServiceRuntime) ExecutePipeline( target interface{}, appContext *appfunction.Context, pipeline *interfaces.FunctionPipeline, @@ -327,7 +327,7 @@ func (gr *GolangRuntime) ExecutePipeline( common.CorrelationHeader, appContext.CorrelationID()) if appContext.RetryData() != nil && !isRetry { - gr.storeForward.storeForLaterRetry(appContext.RetryData(), appContext, pipeline, functionIndex) + asr.storeForward.storeForLaterRetry(appContext.RetryData(), appContext, pipeline, functionIndex) } pipeline.ProcessingErrors.Inc(1) @@ -341,26 +341,26 @@ func (gr *GolangRuntime) ExecutePipeline( return nil } -func (gr *GolangRuntime) StartStoreAndForward( +func (asr *AppServiceRuntime) StartStoreAndForward( appWg *sync.WaitGroup, appCtx context.Context, enabledWg *sync.WaitGroup, enabledCtx context.Context, serviceKey string) { - gr.storeForward.startStoreAndForwardRetryLoop(appWg, appCtx, enabledWg, enabledCtx, serviceKey) + asr.storeForward.startStoreAndForwardRetryLoop(appWg, appCtx, enabledWg, enabledCtx, serviceKey) } -func (gr *GolangRuntime) processEventPayload(envelope types.MessageEnvelope) (*dtos.Event, error) { +func (asr *AppServiceRuntime) processEventPayload(envelope types.MessageEnvelope) (*dtos.Event, error) { - gr.lc.Debug("Attempting to process Payload as an AddEventRequest DTO") + asr.lc.Debug("Attempting to process Payload as an AddEventRequest DTO") requestDto := requests.AddEventRequest{} // Note that DTO validation is called during the unmarshaling // which results in a KindContractInvalid error - requestDtoErr := gr.unmarshalPayload(envelope, &requestDto) + requestDtoErr := asr.unmarshalPayload(envelope, &requestDto) if requestDtoErr == nil { - gr.lc.Debug("Using Event DTO from AddEventRequest DTO") + asr.lc.Debug("Using Event DTO from AddEventRequest DTO") // Determine that we have an AddEventRequest DTO return &requestDto.Event, nil @@ -373,13 +373,13 @@ func (gr *GolangRuntime) processEventPayload(envelope types.MessageEnvelope) (*d // KindContractInvalid indicates that we likely don't have an AddEventRequest // so try to process as Event - gr.lc.Debug("Attempting to process Payload as an Event DTO") + asr.lc.Debug("Attempting to process Payload as an Event DTO") event := &dtos.Event{} - err := gr.unmarshalPayload(envelope, event) + err := asr.unmarshalPayload(envelope, event) if err == nil { err = common.Validate(event) if err == nil { - gr.lc.Debug("Using Event DTO received") + asr.lc.Debug("Using Event DTO received") return event, nil } } @@ -393,7 +393,7 @@ func (gr *GolangRuntime) processEventPayload(envelope types.MessageEnvelope) (*d return nil, requestDtoErr } -func (gr *GolangRuntime) unmarshalPayload(envelope types.MessageEnvelope, target interface{}) error { +func (asr *AppServiceRuntime) unmarshalPayload(envelope types.MessageEnvelope, target interface{}) error { var err error contentType := strings.Split(envelope.ContentType, ";")[0] @@ -412,28 +412,28 @@ func (gr *GolangRuntime) unmarshalPayload(envelope types.MessageEnvelope, target return err } -func (gr *GolangRuntime) debugLogEvent(event *dtos.Event) { - gr.lc.Debugf("Event Received with ProfileName=%s, DeviceName=%s and ReadingCount=%d", +func (asr *AppServiceRuntime) debugLogEvent(event *dtos.Event) { + asr.lc.Debugf("Event Received with ProfileName=%s, DeviceName=%s and ReadingCount=%d", event.ProfileName, event.DeviceName, len(event.Readings)) if len(event.Tags) > 0 { - gr.lc.Debugf("Event tags are: [%v]", event.Tags) + asr.lc.Debugf("Event tags are: [%v]", event.Tags) } else { - gr.lc.Debug("Event has no tags") + asr.lc.Debug("Event has no tags") } for index, reading := range event.Readings { switch strings.ToLower(reading.ValueType) { case strings.ToLower(common.ValueTypeBinary): - gr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`", + asr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`", index+1, reading.ResourceName, reading.ValueType, reading.MediaType, len(reading.BinaryValue)) default: - gr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, Value=`%s`", + asr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, Value=`%s`", index+1, reading.ResourceName, reading.ValueType, @@ -442,26 +442,26 @@ func (gr *GolangRuntime) debugLogEvent(event *dtos.Event) { } } -func (gr *GolangRuntime) logError(err error, correlationID string) { - gr.lc.Errorf("%s. %s=%s", err.Error(), common.CorrelationHeader, correlationID) +func (asr *AppServiceRuntime) logError(err error, correlationID string) { + asr.lc.Errorf("%s. %s=%s", err.Error(), common.CorrelationHeader, correlationID) } -func (gr *GolangRuntime) GetDefaultPipeline() *interfaces.FunctionPipeline { - pipeline := gr.pipelines[interfaces.DefaultPipelineId] +func (asr *AppServiceRuntime) GetDefaultPipeline() *interfaces.FunctionPipeline { + pipeline := asr.pipelines[interfaces.DefaultPipelineId] if pipeline == nil { - pipeline = gr.addFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, nil) + pipeline = asr.addFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, nil) } return pipeline } -func (gr *GolangRuntime) GetMatchingPipelines(incomingTopic string) []*interfaces.FunctionPipeline { +func (asr *AppServiceRuntime) GetMatchingPipelines(incomingTopic string) []*interfaces.FunctionPipeline { var matches []*interfaces.FunctionPipeline - if len(gr.pipelines) == 0 { + if len(asr.pipelines) == 0 { return matches } - for _, pipeline := range gr.pipelines { + for _, pipeline := range asr.pipelines { if topicMatches(incomingTopic, pipeline.Topics) { matches = append(matches, pipeline) } @@ -470,8 +470,8 @@ func (gr *GolangRuntime) GetMatchingPipelines(incomingTopic string) []*interface return matches } -func (gr *GolangRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline { - return gr.pipelines[id] +func (asr *AppServiceRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline { + return asr.pipelines[id] } func topicMatches(incomingTopic string, pipelineTopics []string) bool { diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index 186546c90..c7571e2db 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -74,7 +74,7 @@ func TestProcessMessageBusRequest(t *testing.T) { return true, "Hello" } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{dummyTransform}) _, decodeErr, _ := runtime.DecodeMessage(context, envelope) @@ -95,7 +95,7 @@ func TestProcessMessageNoTransforms(t *testing.T) { } context := appfunction.NewContext("testId", dic, "") - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) require.Nil(t, decodeError) @@ -128,7 +128,7 @@ func TestProcessMessageOneCustomTransform(t *testing.T) { transform1WasCalled = true return true, "Hello" } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -173,7 +173,7 @@ func TestProcessMessageTwoCustomTransforms(t *testing.T) { return true, "Hello" } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1, transform2}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -225,7 +225,7 @@ func TestProcessMessageThreeCustomTransformsOneFail(t *testing.T) { require.Equal(t, "Transform1Result", data, "Did not receive result from previous transform") return true, "Hello" } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1, transform2, transform3}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -260,7 +260,7 @@ func TestProcessMessageTransformError(t *testing.T) { context := appfunction.NewContext("testId", dic, "") // Let the Runtime know we are sending a RegistryInfo, so it passes it to the first function - runtime := NewGolangRuntime("", &config.RegistryInfo{}, dic) + runtime := NewAppServiceRuntime("", &config.RegistryInfo{}, dic) // FilterByDeviceName with return an error if it doesn't receive and Event runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transforms.NewFilterFor([]string{"SomeDevice"}).FilterByDeviceName}) @@ -330,7 +330,7 @@ func TestProcessMessageJSON(t *testing.T) { return false, nil } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -372,7 +372,7 @@ func TestProcessMessageCBOR(t *testing.T) { return false, nil } - runtime := NewGolangRuntime("", nil, dic) + runtime := NewAppServiceRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -445,7 +445,7 @@ func TestDecode_Process_MessageTargetType(t *testing.T) { context := appfunction.NewContext("testing", dic, "") - runtime := NewGolangRuntime("", currentTest.TargetType, dic) + runtime := NewAppServiceRuntime("", currentTest.TargetType, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transforms.NewResponseData().SetResponseData}) targetData, err, _ := runtime.DecodeMessage(context, envelope) @@ -484,7 +484,7 @@ func TestExecutePipelinePersist(t *testing.T) { return true, data } - runtime := NewGolangRuntime(serviceKey, nil, updateDicWithMockStoreClient()) + runtime := NewAppServiceRuntime(serviceKey, nil, updateDicWithMockStoreClient()) httpPost := transforms.NewHTTPSender("http://nowhere", "", true).HTTPPost runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transformPassThru, httpPost}) @@ -533,7 +533,7 @@ func TestGolangRuntime_processEventPayload(t *testing.T) { {"invalid CBOR", cborInvalidPayload, common.ContentTypeCBOR, nil, true}, } - target := GolangRuntime{lc: logger.NewMockClient()} + target := AppServiceRuntime{lc: logger.NewMockClient()} for _, testCase := range tests { t.Run(testCase.Name, func(t *testing.T) { @@ -603,7 +603,7 @@ func TestTopicMatches(t *testing.T) { } func TestGetPipelineById(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) + target := NewAppServiceRuntime(serviceKey, nil, dic) expectedId := "my-pipeline" expectedTopics := []string{"edgex/events/#"} @@ -636,7 +636,7 @@ func TestGetPipelineById(t *testing.T) { } func TestGetMatchingPipelines(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) + target := NewAppServiceRuntime(serviceKey, nil, dic) expectedTransforms := []interfaces.AppFunction{ transforms.NewResponseData().SetResponseData, @@ -669,7 +669,7 @@ func TestGetMatchingPipelines(t *testing.T) { } func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) + target := NewAppServiceRuntime(serviceKey, nil, dic) expectedNilTransformsHash := "Pipeline-functions: " expectedTransforms := []interfaces.AppFunction{ @@ -697,7 +697,7 @@ func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { } func TestGolangRuntime_SetFunctionsPipelineTransforms(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) + target := NewAppServiceRuntime(serviceKey, nil, dic) id := "my-pipeline" topics := []string{"edgex/events/#"} @@ -723,7 +723,7 @@ func TestGolangRuntime_SetFunctionsPipelineTransforms(t *testing.T) { } func TestGolangRuntime_ClearAllFunctionsPipelineTransforms(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) + target := NewAppServiceRuntime(serviceKey, nil, dic) id1 := "pipeline1" id2 := "pipeline2" diff --git a/internal/runtime/storeforward.go b/internal/runtime/storeforward.go index 77beeb1ef..326a86987 100644 --- a/internal/runtime/storeforward.go +++ b/internal/runtime/storeforward.go @@ -37,7 +37,7 @@ const ( ) type storeForwardInfo struct { - runtime *GolangRuntime + runtime *AppServiceRuntime dic *di.Container } diff --git a/internal/runtime/storeforward_test.go b/internal/runtime/storeforward_test.go index 94770a7a6..5bf7f8207 100644 --- a/internal/runtime/storeforward_test.go +++ b/internal/runtime/storeforward_test.go @@ -121,7 +121,7 @@ func TestProcessRetryItems(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { targetTransformWasCalled = false - runtime := NewGolangRuntime(serviceKey, nil, dic) + runtime := NewAppServiceRuntime(serviceKey, nil, dic) var pipeline *interfaces.FunctionPipeline @@ -184,7 +184,7 @@ func TestDoStoreAndForwardRetry(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { - runtime := NewGolangRuntime(serviceKey, nil, updateDicWithMockStoreClient()) + runtime := NewAppServiceRuntime(serviceKey, nil, updateDicWithMockStoreClient()) var pipeline *interfaces.FunctionPipeline