Skip to content

Commit

Permalink
feat: Add RemoveAllFunctionPipelines in ApplicationService interface (e…
Browse files Browse the repository at this point in the history
…dgexfoundry#1220)

Signed-off-by: ZhangZhe <[email protected]>
  • Loading branch information
Zhang Zhe authored Nov 30, 2022
1 parent 9fc50c4 commit 0ecd0af
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 6 deletions.
5 changes: 5 additions & 0 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, tr
return nil
}

// RemoveAllFunctionPipelines removes all existing function pipelines
func (svc *Service) RemoveAllFunctionPipelines() {
svc.runtime.RemoveAllFunctionPipelines()
}

// RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration
func (svc *Service) RequestTimeout() time.Duration {
return svc.requestTimeout
Expand Down
40 changes: 34 additions & 6 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"reflect"
"testing"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"

"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/appfunction"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/bootstrap/container"
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/common"
Expand All @@ -32,12 +39,6 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/v2/internal/webserver"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
builtin "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/transforms"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/interfaces/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"

"github.com/google/uuid"
"github.com/gorilla/mux"
Expand All @@ -57,6 +58,7 @@ func TestMain(m *testing.M) {
lc = logger.NewMockClient()
mockMetricsManager := &mocks.MetricsManager{}
mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockMetricsManager.On("Unregister", mock.Anything)

mockSecretProvider = &mocks.SecretProvider{}

Expand Down Expand Up @@ -372,6 +374,32 @@ func TestService_AddFunctionsPipelineForTopics(t *testing.T) {
})
}
}

func TestService_RemoveAllFunctionPipelines(t *testing.T) {
service := Service{
lc: lc,
dic: dic,
runtime: runtime.NewGolangRuntime("", nil, dic),
config: &common.ConfigurationStruct{
Trigger: common.TriggerInfo{
Type: TriggerTypeMessageBus,
},
},
}

id := "121"
tags := builtin.NewTags(nil)
transforms := []interfaces.AppFunction{tags.AddTags, tags.AddTags}
defaultTopics := []string{"#"}

err := service.AddFunctionsPipelineForTopics(id, defaultTopics, transforms...)
require.NoError(t, err)

service.RemoveAllFunctionPipelines()
actual := service.runtime.GetPipelineById(id)
require.Nil(t, actual)
}

func TestApplicationSettings(t *testing.T) {
expectedSettingKey := "ApplicationName"
expectedSettingValue := "simple-filter-xml"
Expand Down
19 changes: 19 additions & 0 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ func (gr *GolangRuntime) ClearAllFunctionsPipelineTransforms() {
gr.isBusyCopying.Unlock()
}

// RemoveAllFunctionPipelines removes all existing function pipelines
func (gr *GolangRuntime) RemoveAllFunctionPipelines() {
metricManager := bootstrapContainer.MetricsManagerFrom(gr.dic.Get)

gr.isBusyCopying.Lock()
for id := range gr.pipelines {
gr.unregisterPipelineMetric(metricManager, internal.PipelineMessagesProcessedName, id)
gr.unregisterPipelineMetric(metricManager, internal.PipelineMessageProcessingTimeName, id)
gr.unregisterPipelineMetric(metricManager, internal.PipelineProcessingErrorsName, id)
delete(gr.pipelines, id)
}
gr.isBusyCopying.Unlock()
}

// AddFunctionsPipeline is thread safe to set transforms
func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error {
_, exists := gr.pipelines[id]
Expand Down Expand Up @@ -178,6 +192,11 @@ func (gr *GolangRuntime) registerPipelineMetric(metricManager bootstrapInterface
}
}

func (gr *GolangRuntime) unregisterPipelineMetric(metricManager bootstrapInterfaces.MetricsManager, metricName string, pipelineId string) {
registeredName := strings.Replace(metricName, internal.PipelineIdTxt, pipelineId, 1)
metricManager.Unregister(registeredName)
}

// ProcessMessage sends the contents of the message through the functions pipeline
func (gr *GolangRuntime) ProcessMessage(appContext *appfunction.Context, target interface{}, pipeline *interfaces.FunctionPipeline) *MessageError {
if len(pipeline.Transforms) == 0 {
Expand Down
33 changes: 33 additions & 0 deletions internal/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,36 @@ func TestGolangRuntime_ClearAllFunctionsPipelineTransforms(t *testing.T) {
pipeline = target.GetPipelineById(id2)
assert.Nil(t, pipeline.Transforms)
}

func TestGolangRuntime_RemoveAllFunctionPipelines(t *testing.T) {
target := NewGolangRuntime(serviceKey, nil, dic)

id1 := "pipeline1"
id2 := "pipeline2"
topics := []string{"edgex/events/#"}
transforms := []interfaces.AppFunction{
transforms.NewResponseData().SetResponseData,
}

target.SetDefaultFunctionsPipeline(transforms)
err := target.AddFunctionsPipeline(id1, topics, transforms)
require.NoError(t, err)
err = target.AddFunctionsPipeline(id2, topics, transforms)
require.NoError(t, err)

pipeline := target.GetDefaultPipeline()
require.NotNil(t, pipeline)
pipeline = target.GetPipelineById(id1)
require.NotNil(t, pipeline)
pipeline = target.GetPipelineById(id2)
require.NotNil(t, pipeline)

target.RemoveAllFunctionPipelines()

pipeline = target.GetDefaultPipeline()
require.Nil(t, pipeline.Transforms)
pipeline = target.GetPipelineById(id1)
assert.Nil(t, pipeline)
pipeline = target.GetPipelineById(id2)
assert.Nil(t, pipeline)
}
1 change: 1 addition & 0 deletions internal/runtime/storeforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TestMain(m *testing.M) {

mockMetricsManager := &mocks2.MetricsManager{}
mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockMetricsManager.On("Unregister", mock.Anything)

dic = di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
Expand Down
5 changes: 5 additions & 0 deletions pkg/interfaces/mocks/ApplicationService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/interfaces/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type ApplicationService interface {
// so that it matches multiple incoming topics. If just "#" is used for the specified topic it will match all incoming
// topics and the specified functions pipeline will execute on every message received.
AddFunctionsPipelineForTopics(id string, topic []string, transforms ...AppFunction) error
// RemoveAllFunctionPipelines removes all existing function pipelines
RemoveAllFunctionPipelines()
// MakeItRun starts the configured trigger to allow the functions pipeline to execute when the trigger
// receives data and starts the internal webserver. This is a long running function which does not return until
// the service is stopped or MakeItStop() is called.
Expand Down

0 comments on commit 0ecd0af

Please sign in to comment.