diff --git a/app-service-template/go.mod b/app-service-template/go.mod index ada11d10d..04e39201c 100644 --- a/app-service-template/go.mod +++ b/app-service-template/go.mod @@ -11,7 +11,7 @@ require ( github.com/edgexfoundry/go-mod-core-contracts/v2 v2.3.0 github.com/google/uuid v1.3.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.1 ) require ( @@ -21,7 +21,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/diegoholiveira/jsonlogic/v3 v3.2.6 // indirect - github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect + github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect github.com/edgexfoundry/go-mod-bootstrap/v2 v2.3.0 // indirect github.com/edgexfoundry/go-mod-configuration/v2 v2.3.0 // indirect github.com/edgexfoundry/go-mod-messaging/v2 v2.3.0 // indirect @@ -63,7 +63,7 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spiffe/go-spiffe/v2 v2.1.1 // indirect - github.com/stretchr/objx v0.4.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/errs v1.2.2 // indirect golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 // indirect diff --git a/app-service-template/go.sum b/app-service-template/go.sum index 32c1691f7..012523c5d 100644 --- a/app-service-template/go.sum +++ b/app-service-template/go.sum @@ -43,8 +43,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/diegoholiveira/jsonlogic/v3 v3.2.6 h1:EV607wRY72hT3V90ZOQw+zjXR9KIUV9jnHfT3yS8uks= github.com/diegoholiveira/jsonlogic/v3 v3.2.6/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg= -github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI= -github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/edgexfoundry/go-mod-bootstrap/v2 v2.3.0 h1:NCOc5bOz6oX+KuOcv0Rt6BTSDShbg4VmCapXO19IoCM= github.com/edgexfoundry/go-mod-bootstrap/v2 v2.3.0/go.mod h1:jWZjsy0BaO6+S+Tq111mcrkGKpg6bhTyqngyYcl+70E= github.com/edgexfoundry/go-mod-configuration/v2 v2.3.0 h1:VeK7VBiNc/RIR7HXC0ya3fWCmcFQzrSLITPYn87cQsA= @@ -289,8 +289,9 @@ github.com/spiffe/go-spiffe/v2 v2.1.1 h1:RT9kM8MZLZIsPTH+HKQEP5yaAk3yd/VBzlINaRj github.com/spiffe/go-spiffe/v2 v2.1.1/go.mod h1:5qg6rpqlwIub0JAiF1UK9IMD6BpPTmvG6yfSgDBs5lg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -298,8 +299,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= diff --git a/internal/app/service.go b/internal/app/service.go index cdd64e676..199b37b46 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -84,7 +84,7 @@ type Service struct { config *common.ConfigurationStruct lc logger.LoggingClient usingConfigurablePipeline bool - runtime *runtime.AppServiceRuntime + runtime *runtime.FunctionsPipelineRuntime webserver *webserver.WebServer ctx contextGroup deferredFunctions []bootstrap.Deferred @@ -549,7 +549,7 @@ func (svc *Service) Initialize() error { return fmt.Errorf("unable to parse Service.RequestTimeout configuration as a time duration: %s", err.Error()) } - svc.runtime = runtime.NewAppServiceRuntime(svc.serviceKey, svc.targetType, svc.dic) + svc.runtime = runtime.NewFunctionPipelineRuntime(svc.serviceKey, svc.targetType, svc.dic) // Bootstrapping is complete, so now need to retrieve the needed objects from the containers. svc.lc = bootstrapContainer.LoggingClientFrom(svc.dic.Get) diff --git a/internal/app/service_test.go b/internal/app/service_test.go index 9166f3c9c..48bbb07f3 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -256,7 +256,7 @@ func TestSetupHTTPTrigger(t *testing.T) { }, } - testRuntime := runtime.NewAppServiceRuntime("", nil, dic) + testRuntime := runtime.NewFunctionPipelineRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) sdk.runtime = testRuntime @@ -276,7 +276,7 @@ func TestSetupMessageBusTrigger(t *testing.T) { }, }, } - testRuntime := runtime.NewAppServiceRuntime("", nil, dic) + testRuntime := runtime.NewFunctionPipelineRuntime("", nil, dic) testRuntime.SetDefaultFunctionsPipeline(nil) sdk.runtime = testRuntime @@ -305,7 +305,7 @@ func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) { service := Service{ lc: lc, dic: dic, - runtime: runtime.NewAppServiceRuntime("", nil, dic), + runtime: runtime.NewFunctionPipelineRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, @@ -324,7 +324,7 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) { service := Service{ lc: lc, dic: dic, - runtime: runtime.NewAppServiceRuntime("", nil, dic), + runtime: runtime.NewFunctionPipelineRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, @@ -379,7 +379,7 @@ func TestService_RemoveAllFunctionPipelines(t *testing.T) { service := Service{ lc: lc, dic: dic, - runtime: runtime.NewGolangRuntime("", nil, dic), + runtime: runtime.NewFunctionPipelineRuntime("", nil, dic), config: &common.ConfigurationStruct{ Trigger: common.TriggerInfo{ Type: TriggerTypeMessageBus, diff --git a/internal/app/triggermessageprocessor.go b/internal/app/triggermessageprocessor.go index 936007f77..ffcb1efa1 100644 --- a/internal/app/triggermessageprocessor.go +++ b/internal/app/triggermessageprocessor.go @@ -41,7 +41,7 @@ import ( type simpleTriggerServiceBinding struct { *Service - *runtime.AppServiceRuntime + *runtime.FunctionsPipelineRuntime } func (b *simpleTriggerServiceBinding) SecretProvider() messaging.SecretDataProvider { diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index 72f251db7..ce1a41471 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -66,8 +66,8 @@ func NewFunctionPipeline(id string, topics []string, transforms []interfaces.App return pipeline } -// AppServiceRuntime represents the golang runtime environment -type AppServiceRuntime struct { +// FunctionsPipelineRuntime represents the golang runtime environment for App Services' Functions Pipelines +type FunctionsPipelineRuntime struct { TargetType interface{} ServiceKey string pipelines map[string]*interfaces.FunctionPipeline @@ -82,135 +82,135 @@ type MessageError struct { ErrorCode int } -// NewAppServiceRuntime creates and initializes the AppServiceRuntime instance -func NewAppServiceRuntime(serviceKey string, targetType interface{}, dic *di.Container) *AppServiceRuntime { - asr := &AppServiceRuntime{ +// NewFunctionPipelineRuntime creates and initializes the AppServiceRuntime instance +func NewFunctionPipelineRuntime(serviceKey string, targetType interface{}, dic *di.Container) *FunctionsPipelineRuntime { + fpr := &FunctionsPipelineRuntime{ ServiceKey: serviceKey, TargetType: targetType, dic: dic, pipelines: make(map[string]*interfaces.FunctionPipeline), } - asr.storeForward.dic = dic - asr.storeForward.runtime = asr - asr.lc = bootstrapContainer.LoggingClientFrom(asr.dic.Get) + fpr.storeForward.dic = dic + fpr.storeForward.runtime = fpr + fpr.lc = bootstrapContainer.LoggingClientFrom(fpr.dic.Get) - return asr + return fpr } // SetDefaultFunctionsPipeline sets the default function pipeline -func (asr *AppServiceRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.AppFunction) { - pipeline := asr.GetDefaultPipeline() // ensures the default pipeline exists - asr.SetFunctionsPipelineTransforms(pipeline.Id, transforms) +func (fpr *FunctionsPipelineRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.AppFunction) { + pipeline := fpr.GetDefaultPipeline() // ensures the default pipeline exists + fpr.SetFunctionsPipelineTransforms(pipeline.Id, transforms) } // SetFunctionsPipelineTransforms sets the transforms for an existing function pipeline. // Non-existent pipelines are ignored -func (asr *AppServiceRuntime) SetFunctionsPipelineTransforms(id string, transforms []interfaces.AppFunction) { - pipeline := asr.pipelines[id] +func (fpr *FunctionsPipelineRuntime) SetFunctionsPipelineTransforms(id string, transforms []interfaces.AppFunction) { + pipeline := fpr.pipelines[id] if pipeline != nil { - asr.isBusyCopying.Lock() + fpr.isBusyCopying.Lock() pipeline.Transforms = transforms pipeline.Hash = calculatePipelineHash(transforms) - asr.isBusyCopying.Unlock() - asr.lc.Infof("Transforms set for `%s` pipeline", id) + fpr.isBusyCopying.Unlock() + fpr.lc.Infof("Transforms set for `%s` pipeline", id) } else { - asr.lc.Warnf("Unable to set transforms for `%s` pipeline: Pipeline not found", id) + fpr.lc.Warnf("Unable to set transforms for `%s` pipeline: Pipeline not found", id) } } // SetFunctionsPipelineTopics sets the topics for an existing function pipeline. // Non-existent pipelines are ignored -func (asr *AppServiceRuntime) SetFunctionsPipelineTopics(id string, topics []string) { - pipeline := asr.pipelines[id] +func (fpr *FunctionsPipelineRuntime) SetFunctionsPipelineTopics(id string, topics []string) { + pipeline := fpr.pipelines[id] if pipeline != nil { - asr.isBusyCopying.Lock() + fpr.isBusyCopying.Lock() pipeline.Topics = topics - asr.isBusyCopying.Unlock() - asr.lc.Infof("Topics set for `%s` pipeline", id) + fpr.isBusyCopying.Unlock() + fpr.lc.Infof("Topics set for `%s` pipeline", id) } else { - asr.lc.Warnf("Unable to set topica for `%s` pipeline: Pipeline not found", id) + fpr.lc.Warnf("Unable to set topica for `%s` pipeline: Pipeline not found", id) } } // ClearAllFunctionsPipelineTransforms clears the transforms for all existing function pipelines. -func (asr *AppServiceRuntime) ClearAllFunctionsPipelineTransforms() { - asr.isBusyCopying.Lock() - for index := range asr.pipelines { - asr.pipelines[index].Transforms = nil - asr.pipelines[index].Hash = "" +func (fpr *FunctionsPipelineRuntime) ClearAllFunctionsPipelineTransforms() { + fpr.isBusyCopying.Lock() + for index := range fpr.pipelines { + fpr.pipelines[index].Transforms = nil + fpr.pipelines[index].Hash = "" } - asr.isBusyCopying.Unlock() + fpr.isBusyCopying.Unlock() } // RemoveAllFunctionPipelines removes all existing function pipelines -func (gr *GolangRuntime) RemoveAllFunctionPipelines() { - metricManager := bootstrapContainer.MetricsManagerFrom(gr.dic.Get) - - gr.isBusyCopying.Lock() - for id := range gr.pipelines { - gr.unregisterPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, id) - gr.unregisterPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, id) - gr.unregisterPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, id) - delete(gr.pipelines, id) +func (fpr *FunctionsPipelineRuntime) RemoveAllFunctionPipelines() { + metricManager := bootstrapContainer.MetricsManagerFrom(fpr.dic.Get) + + fpr.isBusyCopying.Lock() + for id := range fpr.pipelines { + fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, id) + fpr.unregisterPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, id) + fpr.unregisterPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, id) + delete(fpr.pipelines, id) } - gr.isBusyCopying.Unlock() + fpr.isBusyCopying.Unlock() } // AddFunctionsPipeline is thread safe to set transforms -func (asr *AppServiceRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { - _, exists := asr.pipelines[id] +func (fpr *FunctionsPipelineRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { + _, exists := fpr.pipelines[id] if exists { return fmt.Errorf("pipeline with Id='%s' already exists", id) } - _ = asr.addFunctionsPipeline(id, topics, transforms) + _ = fpr.addFunctionsPipeline(id, topics, transforms) return nil } -func (asr *AppServiceRuntime) addFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) *interfaces.FunctionPipeline { +func (fpr *FunctionsPipelineRuntime) addFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) *interfaces.FunctionPipeline { pipeline := NewFunctionPipeline(id, topics, transforms) - asr.isBusyCopying.Lock() - asr.pipelines[id] = &pipeline - asr.isBusyCopying.Unlock() + fpr.isBusyCopying.Lock() + fpr.pipelines[id] = &pipeline + fpr.isBusyCopying.Unlock() - metricManager := bootstrapContainer.MetricsManagerFrom(asr.dic.Get) - asr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed) - asr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime) - asr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors) + metricManager := bootstrapContainer.MetricsManagerFrom(fpr.dic.Get) + fpr.registerPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, pipeline.Id, pipeline.MessagesProcessed) + fpr.registerPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, pipeline.Id, pipeline.MessageProcessingTime) + fpr.registerPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, pipeline.Id, pipeline.ProcessingErrors) return &pipeline } -func (asr *AppServiceRuntime) registerPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string, metric interface{}) { +func (fpr *FunctionsPipelineRuntime) registerPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string, metric interface{}) { registeredName := strings.Replace(metricName, internal.PipelineIdTxt, pipelineId, 1) err := metricManager.Register(registeredName, metric, map[string]string{"pipeline": pipelineId}) if err != nil { - asr.lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", registeredName, err.Error()) + fpr.lc.Warnf("Unable to register %s metric. Metric will not be reported : %s", registeredName, err.Error()) } else { - asr.lc.Infof("%s metric has been registered and will be reported (if enabled)", registeredName) + fpr.lc.Infof("%s metric has been registered and will be reported (if enabled)", registeredName) } } -func (gr *GolangRuntime) unregisterPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string) { +func (fpr *FunctionsPipelineRuntime) unregisterPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string) { registeredName := strings.Replace(metricName, internal.PipelineIdTxt, pipelineId, 1) metricManager.Unregister(registeredName) } // ProcessMessage sends the contents of the message through the functions pipeline -func (asr *AppServiceRuntime) ProcessMessage(appContext *appfunction.Context, target interface{}, pipeline *interfaces.FunctionPipeline) *MessageError { +func (fpr *FunctionsPipelineRuntime) ProcessMessage(appContext *appfunction.Context, target interface{}, pipeline *interfaces.FunctionPipeline) *MessageError { if len(pipeline.Transforms) == 0 { err := fmt.Errorf("no transforms configured for pipleline Id='%s'. Please check log for earlier errors loading pipeline", pipeline.Id) - asr.logError(err, appContext.CorrelationID()) + fpr.logError(err, appContext.CorrelationID()) return &MessageError{Err: err, ErrorCode: http.StatusInternalServerError} } appContext.AddValue(interfaces.PIPELINEID, pipeline.Id) - asr.lc.Debugf("Pipeline '%s' processing message %d Transforms", pipeline.Id, len(pipeline.Transforms)) + fpr.lc.Debugf("Pipeline '%s' processing message %d Transforms", pipeline.Id, len(pipeline.Transforms)) // Make copy of transform functions to avoid disruption of pipeline when updating the pipeline from registry - asr.isBusyCopying.Lock() + fpr.isBusyCopying.Lock() execPipeline := &interfaces.FunctionPipeline{ Id: pipeline.Id, Transforms: make([]interfaces.AppFunction, len(pipeline.Transforms)), @@ -221,47 +221,47 @@ func (asr *AppServiceRuntime) ProcessMessage(appContext *appfunction.Context, ta ProcessingErrors: pipeline.ProcessingErrors, } copy(execPipeline.Transforms, pipeline.Transforms) - asr.isBusyCopying.Unlock() + fpr.isBusyCopying.Unlock() - return asr.ExecutePipeline(target, appContext, execPipeline, 0, false) + return fpr.ExecutePipeline(target, appContext, execPipeline, 0, false) } // DecodeMessage decode the message wrapped in the MessageEnvelope and return the data to be processed. -func (asr *AppServiceRuntime) DecodeMessage(appContext *appfunction.Context, envelope types.MessageEnvelope) (interface{}, *MessageError, bool) { +func (fpr *FunctionsPipelineRuntime) DecodeMessage(appContext *appfunction.Context, envelope types.MessageEnvelope) (interface{}, *MessageError, bool) { // Default Target Type for the function pipeline is an Event DTO. // The Event DTO can be wrapped in an AddEventRequest DTO or just be the un-wrapped Event DTO, // which is handled dynamically below. - if asr.TargetType == nil { - asr.TargetType = &dtos.Event{} + if fpr.TargetType == nil { + fpr.TargetType = &dtos.Event{} } - if reflect.TypeOf(asr.TargetType).Kind() != reflect.Ptr { + if reflect.TypeOf(fpr.TargetType).Kind() != reflect.Ptr { err := errors.New("TargetType must be a pointer, not a value of the target type") - asr.logError(err, envelope.CorrelationID) + fpr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusInternalServerError}, false } // Must make a copy of the type so that data isn't retained between calls for custom types - target := reflect.New(reflect.ValueOf(asr.TargetType).Elem().Type()).Interface() + target := reflect.New(reflect.ValueOf(fpr.TargetType).Elem().Type()).Interface() switch target.(type) { case *[]byte: - asr.lc.Debug("Expecting raw byte data") + fpr.lc.Debug("Expecting raw byte data") target = &envelope.Payload case *dtos.Event: - asr.lc.Debug("Expecting an AddEventRequest or Event DTO") + fpr.lc.Debug("Expecting an AddEventRequest or Event DTO") // Dynamically process either AddEventRequest or Event DTO - event, err := asr.processEventPayload(envelope) + event, err := fpr.processEventPayload(envelope) if err != nil { err = fmt.Errorf("unable to process payload %s", err.Error()) - asr.logError(err, envelope.CorrelationID) + fpr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true } - if asr.lc.LogLevel() == models.DebugLog { - asr.debugLogEvent(event) + if fpr.lc.LogLevel() == models.DebugLog { + fpr.debugLogEvent(event) } appContext.AddValue(interfaces.DEVICENAME, event.DeviceName) @@ -272,12 +272,12 @@ func (asr *AppServiceRuntime) DecodeMessage(appContext *appfunction.Context, env default: customTypeName := di.TypeInstanceToName(target) - asr.lc.Debugf("Expecting a custom type of %s", customTypeName) + fpr.lc.Debugf("Expecting a custom type of %s", customTypeName) // Expecting a custom type so just unmarshal into the target type. - if err := asr.unmarshalPayload(envelope, target); err != nil { + if err := fpr.unmarshalPayload(envelope, target); err != nil { err = fmt.Errorf("unable to process custom object received of type '%s': %s", customTypeName, err.Error()) - asr.logError(err, envelope.CorrelationID) + fpr.logError(err, envelope.CorrelationID) return nil, &MessageError{Err: err, ErrorCode: http.StatusBadRequest}, true } } @@ -293,7 +293,7 @@ func (asr *AppServiceRuntime) DecodeMessage(appContext *appfunction.Context, env return target, nil, false } -func (asr *AppServiceRuntime) ExecutePipeline( +func (fpr *FunctionsPipelineRuntime) ExecutePipeline( target interface{}, appContext *appfunction.Context, pipeline *interfaces.FunctionPipeline, @@ -327,7 +327,7 @@ func (asr *AppServiceRuntime) ExecutePipeline( common.CorrelationHeader, appContext.CorrelationID()) if appContext.RetryData() != nil && !isRetry { - asr.storeForward.storeForLaterRetry(appContext.RetryData(), appContext, pipeline, functionIndex) + fpr.storeForward.storeForLaterRetry(appContext.RetryData(), appContext, pipeline, functionIndex) } pipeline.ProcessingErrors.Inc(1) @@ -341,26 +341,26 @@ func (asr *AppServiceRuntime) ExecutePipeline( return nil } -func (asr *AppServiceRuntime) StartStoreAndForward( +func (fpr *FunctionsPipelineRuntime) StartStoreAndForward( appWg *sync.WaitGroup, appCtx context.Context, enabledWg *sync.WaitGroup, enabledCtx context.Context, serviceKey string) { - asr.storeForward.startStoreAndForwardRetryLoop(appWg, appCtx, enabledWg, enabledCtx, serviceKey) + fpr.storeForward.startStoreAndForwardRetryLoop(appWg, appCtx, enabledWg, enabledCtx, serviceKey) } -func (asr *AppServiceRuntime) processEventPayload(envelope types.MessageEnvelope) (*dtos.Event, error) { +func (fpr *FunctionsPipelineRuntime) processEventPayload(envelope types.MessageEnvelope) (*dtos.Event, error) { - asr.lc.Debug("Attempting to process Payload as an AddEventRequest DTO") + fpr.lc.Debug("Attempting to process Payload as an AddEventRequest DTO") requestDto := requests.AddEventRequest{} // Note that DTO validation is called during the unmarshaling // which results in a KindContractInvalid error - requestDtoErr := asr.unmarshalPayload(envelope, &requestDto) + requestDtoErr := fpr.unmarshalPayload(envelope, &requestDto) if requestDtoErr == nil { - asr.lc.Debug("Using Event DTO from AddEventRequest DTO") + fpr.lc.Debug("Using Event DTO from AddEventRequest DTO") // Determine that we have an AddEventRequest DTO return &requestDto.Event, nil @@ -373,13 +373,13 @@ func (asr *AppServiceRuntime) processEventPayload(envelope types.MessageEnvelope // KindContractInvalid indicates that we likely don't have an AddEventRequest // so try to process as Event - asr.lc.Debug("Attempting to process Payload as an Event DTO") + fpr.lc.Debug("Attempting to process Payload as an Event DTO") event := &dtos.Event{} - err := asr.unmarshalPayload(envelope, event) + err := fpr.unmarshalPayload(envelope, event) if err == nil { err = common.Validate(event) if err == nil { - asr.lc.Debug("Using Event DTO received") + fpr.lc.Debug("Using Event DTO received") return event, nil } } @@ -393,7 +393,7 @@ func (asr *AppServiceRuntime) processEventPayload(envelope types.MessageEnvelope return nil, requestDtoErr } -func (asr *AppServiceRuntime) unmarshalPayload(envelope types.MessageEnvelope, target interface{}) error { +func (fpr *FunctionsPipelineRuntime) unmarshalPayload(envelope types.MessageEnvelope, target interface{}) error { var err error contentType := strings.Split(envelope.ContentType, ";")[0] @@ -412,28 +412,28 @@ func (asr *AppServiceRuntime) unmarshalPayload(envelope types.MessageEnvelope, t return err } -func (asr *AppServiceRuntime) debugLogEvent(event *dtos.Event) { - asr.lc.Debugf("Event Received with ProfileName=%s, DeviceName=%s and ReadingCount=%d", +func (fpr *FunctionsPipelineRuntime) debugLogEvent(event *dtos.Event) { + fpr.lc.Debugf("Event Received with ProfileName=%s, DeviceName=%s and ReadingCount=%d", event.ProfileName, event.DeviceName, len(event.Readings)) if len(event.Tags) > 0 { - asr.lc.Debugf("Event tags are: [%v]", event.Tags) + fpr.lc.Debugf("Event tags are: [%v]", event.Tags) } else { - asr.lc.Debug("Event has no tags") + fpr.lc.Debug("Event has no tags") } for index, reading := range event.Readings { switch strings.ToLower(reading.ValueType) { case strings.ToLower(common.ValueTypeBinary): - asr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`", + fpr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, MediaType=%s and BinaryValue of size=`%d`", index+1, reading.ResourceName, reading.ValueType, reading.MediaType, len(reading.BinaryValue)) default: - asr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, Value=`%s`", + fpr.lc.Debugf("Reading #%d received with ResourceName=%s, ValueType=%s, Value=`%s`", index+1, reading.ResourceName, reading.ValueType, @@ -442,26 +442,26 @@ func (asr *AppServiceRuntime) debugLogEvent(event *dtos.Event) { } } -func (asr *AppServiceRuntime) logError(err error, correlationID string) { - asr.lc.Errorf("%s. %s=%s", err.Error(), common.CorrelationHeader, correlationID) +func (fpr *FunctionsPipelineRuntime) logError(err error, correlationID string) { + fpr.lc.Errorf("%s. %s=%s", err.Error(), common.CorrelationHeader, correlationID) } -func (asr *AppServiceRuntime) GetDefaultPipeline() *interfaces.FunctionPipeline { - pipeline := asr.pipelines[interfaces.DefaultPipelineId] +func (fpr *FunctionsPipelineRuntime) GetDefaultPipeline() *interfaces.FunctionPipeline { + pipeline := fpr.pipelines[interfaces.DefaultPipelineId] if pipeline == nil { - pipeline = asr.addFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, nil) + pipeline = fpr.addFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, nil) } return pipeline } -func (asr *AppServiceRuntime) GetMatchingPipelines(incomingTopic string) []*interfaces.FunctionPipeline { +func (fpr *FunctionsPipelineRuntime) GetMatchingPipelines(incomingTopic string) []*interfaces.FunctionPipeline { var matches []*interfaces.FunctionPipeline - if len(asr.pipelines) == 0 { + if len(fpr.pipelines) == 0 { return matches } - for _, pipeline := range asr.pipelines { + for _, pipeline := range fpr.pipelines { if topicMatches(incomingTopic, pipeline.Topics) { matches = append(matches, pipeline) } @@ -470,8 +470,8 @@ func (asr *AppServiceRuntime) GetMatchingPipelines(incomingTopic string) []*inte return matches } -func (asr *AppServiceRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline { - return asr.pipelines[id] +func (fpr *FunctionsPipelineRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline { + return fpr.pipelines[id] } func topicMatches(incomingTopic string, pipelineTopics []string) bool { diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index c7571e2db..2e5daac6d 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -74,7 +74,7 @@ func TestProcessMessageBusRequest(t *testing.T) { return true, "Hello" } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{dummyTransform}) _, decodeErr, _ := runtime.DecodeMessage(context, envelope) @@ -95,7 +95,7 @@ func TestProcessMessageNoTransforms(t *testing.T) { } context := appfunction.NewContext("testId", dic, "") - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) require.Nil(t, decodeError) @@ -128,7 +128,7 @@ func TestProcessMessageOneCustomTransform(t *testing.T) { transform1WasCalled = true return true, "Hello" } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -173,7 +173,7 @@ func TestProcessMessageTwoCustomTransforms(t *testing.T) { return true, "Hello" } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1, transform2}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -225,7 +225,7 @@ func TestProcessMessageThreeCustomTransformsOneFail(t *testing.T) { require.Equal(t, "Transform1Result", data, "Did not receive result from previous transform") return true, "Hello" } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1, transform2, transform3}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -260,7 +260,7 @@ func TestProcessMessageTransformError(t *testing.T) { context := appfunction.NewContext("testId", dic, "") // Let the Runtime know we are sending a RegistryInfo, so it passes it to the first function - runtime := NewAppServiceRuntime("", &config.RegistryInfo{}, dic) + runtime := NewFunctionPipelineRuntime("", &config.RegistryInfo{}, dic) // FilterByDeviceName with return an error if it doesn't receive and Event runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transforms.NewFilterFor([]string{"SomeDevice"}).FilterByDeviceName}) @@ -330,7 +330,7 @@ func TestProcessMessageJSON(t *testing.T) { return false, nil } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -372,7 +372,7 @@ func TestProcessMessageCBOR(t *testing.T) { return false, nil } - runtime := NewAppServiceRuntime("", nil, dic) + runtime := NewFunctionPipelineRuntime("", nil, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transform1}) messageData, decodeError, _ := runtime.DecodeMessage(context, envelope) @@ -445,7 +445,7 @@ func TestDecode_Process_MessageTargetType(t *testing.T) { context := appfunction.NewContext("testing", dic, "") - runtime := NewAppServiceRuntime("", currentTest.TargetType, dic) + runtime := NewFunctionPipelineRuntime("", currentTest.TargetType, dic) runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transforms.NewResponseData().SetResponseData}) targetData, err, _ := runtime.DecodeMessage(context, envelope) @@ -484,7 +484,7 @@ func TestExecutePipelinePersist(t *testing.T) { return true, data } - runtime := NewAppServiceRuntime(serviceKey, nil, updateDicWithMockStoreClient()) + runtime := NewFunctionPipelineRuntime(serviceKey, nil, updateDicWithMockStoreClient()) httpPost := transforms.NewHTTPSender("http://nowhere", "", true).HTTPPost runtime.SetDefaultFunctionsPipeline([]interfaces.AppFunction{transformPassThru, httpPost}) @@ -533,7 +533,7 @@ func TestGolangRuntime_processEventPayload(t *testing.T) { {"invalid CBOR", cborInvalidPayload, common.ContentTypeCBOR, nil, true}, } - target := AppServiceRuntime{lc: logger.NewMockClient()} + target := FunctionsPipelineRuntime{lc: logger.NewMockClient()} for _, testCase := range tests { t.Run(testCase.Name, func(t *testing.T) { @@ -603,7 +603,7 @@ func TestTopicMatches(t *testing.T) { } func TestGetPipelineById(t *testing.T) { - target := NewAppServiceRuntime(serviceKey, nil, dic) + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) expectedId := "my-pipeline" expectedTopics := []string{"edgex/events/#"} @@ -636,7 +636,7 @@ func TestGetPipelineById(t *testing.T) { } func TestGetMatchingPipelines(t *testing.T) { - target := NewAppServiceRuntime(serviceKey, nil, dic) + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) expectedTransforms := []interfaces.AppFunction{ transforms.NewResponseData().SetResponseData, @@ -669,7 +669,7 @@ func TestGetMatchingPipelines(t *testing.T) { } func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { - target := NewAppServiceRuntime(serviceKey, nil, dic) + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) expectedNilTransformsHash := "Pipeline-functions: " expectedTransforms := []interfaces.AppFunction{ @@ -697,7 +697,7 @@ func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { } func TestGolangRuntime_SetFunctionsPipelineTransforms(t *testing.T) { - target := NewAppServiceRuntime(serviceKey, nil, dic) + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) id := "my-pipeline" topics := []string{"edgex/events/#"} @@ -723,7 +723,7 @@ func TestGolangRuntime_SetFunctionsPipelineTransforms(t *testing.T) { } func TestGolangRuntime_ClearAllFunctionsPipelineTransforms(t *testing.T) { - target := NewAppServiceRuntime(serviceKey, nil, dic) + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) id1 := "pipeline1" id2 := "pipeline2" @@ -753,8 +753,8 @@ func TestGolangRuntime_ClearAllFunctionsPipelineTransforms(t *testing.T) { assert.Nil(t, pipeline.Transforms) } -func TestGolangRuntime_RemoveAllFunctionPipelines(t *testing.T) { - target := NewGolangRuntime(serviceKey, nil, dic) +func TestFunctionPipelineRuntime_RemoveAllFunctionPipelines(t *testing.T) { + target := NewFunctionPipelineRuntime(serviceKey, nil, dic) id1 := "pipeline1" id2 := "pipeline2" diff --git a/internal/runtime/storeforward.go b/internal/runtime/storeforward.go index 326a86987..49b3e93ee 100644 --- a/internal/runtime/storeforward.go +++ b/internal/runtime/storeforward.go @@ -37,7 +37,7 @@ const ( ) type storeForwardInfo struct { - runtime *AppServiceRuntime + runtime *FunctionsPipelineRuntime dic *di.Container } diff --git a/internal/runtime/storeforward_test.go b/internal/runtime/storeforward_test.go index 5bf7f8207..6d02c21db 100644 --- a/internal/runtime/storeforward_test.go +++ b/internal/runtime/storeforward_test.go @@ -121,7 +121,7 @@ func TestProcessRetryItems(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { targetTransformWasCalled = false - runtime := NewAppServiceRuntime(serviceKey, nil, dic) + runtime := NewFunctionPipelineRuntime(serviceKey, nil, dic) var pipeline *interfaces.FunctionPipeline @@ -184,7 +184,7 @@ func TestDoStoreAndForwardRetry(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { - runtime := NewAppServiceRuntime(serviceKey, nil, updateDicWithMockStoreClient()) + runtime := NewFunctionPipelineRuntime(serviceKey, nil, updateDicWithMockStoreClient()) var pipeline *interfaces.FunctionPipeline