diff --git a/app-service-template/go.sum b/app-service-template/go.sum index e62d184cd..7bf3470bb 100644 --- a/app-service-template/go.sum +++ b/app-service-template/go.sum @@ -81,8 +81,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= -github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc= github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8= github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= diff --git a/app-service-template/main.go b/app-service-template/main.go index 4d327eb9b..ecb45c81e 100644 --- a/app-service-template/main.go +++ b/app-service-template/main.go @@ -100,9 +100,9 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu sample := functions.NewSample() // TODO: Replace below functions with built in and/or your custom functions for your use case - // or remove is using Pipeline By Topic below. + // or remove if using Pipeline By Topics below. // See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions - err = app.service.SetFunctionsPipeline( + err = app.service.SetDefaultFunctionsPipeline( transforms.NewFilterFor(deviceNames).FilterByDeviceName, sample.LogEventDetails, sample.ConvertEventToXML, @@ -120,8 +120,8 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu // Core Data publishes to the 'edgex/events/core///' topic // Note: This example with default above causes Events from Random-Float-Device device to be processed twice // resulting in the XML to be published back to the MessageBus twice. - // See for more details. - err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#", + // See https://docs.edgexfoundry.org/2.0/microservices/application/AdvancedTopics/#pipeline-per-topics for more details. + err = app.service.AddFunctionsPipelineForTopics("Floats", []string{"edgex/events/#/#/Random-Float-Device/#"}, transforms.NewFilterFor(deviceNames).FilterByDeviceName, sample.LogEventDetails, sample.ConvertEventToXML, @@ -132,7 +132,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu } // Note: This example with default above causes Events from Int32 source to be processed twice // resulting in the XML to be published back to the MessageBus twice. - err = app.service.AddFunctionsPipelineForTopic("Int32s", "edgex/events/#/#/#/Int32", + err = app.service.AddFunctionsPipelineForTopics("Int32s", []string{"edgex/events/#/#/#/Int32"}, transforms.NewFilterFor(deviceNames).FilterByDeviceName, sample.LogEventDetails, sample.ConvertEventToXML, diff --git a/app-service-template/main_test.go b/app-service-template/main_test.go index 933da76f8..e72ca5c9d 100644 --- a/app-service-template/main_test.go +++ b/app-service-template/main_test.go @@ -42,9 +42,9 @@ func TestCreateAndRunService_Success(t *testing.T) { mockAppService.On("LoggingClient").Return(logger.NewMockClient()) mockAppService.On("GetAppSettingStrings", "DeviceNames"). Return([]string{"Random-Boolean-Device, Random-Integer-Device"}, nil) - mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil) - mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil) mockAppService.On("LoadCustomConfig", mock.Anything, mock.Anything, mock.Anything). Return(nil).Run(func(args mock.Arguments) { @@ -115,7 +115,7 @@ func TestCreateAndRunService_SetFunctionsPipeline_Failed(t *testing.T) { }) mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything). Return(nil) - mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) { setFunctionsPipelineCalled = true }) @@ -148,9 +148,9 @@ func TestCreateAndRunService_MakeItRun_Failed(t *testing.T) { }) mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything). Return(nil) - mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil) - mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil) mockAppService.On("MakeItRun").Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) { makeItRunCalled = true diff --git a/internal/app/service.go b/internal/app/service.go index a67b28a9c..bc224c4c5 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -270,7 +270,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F pipeline := interfaces.FunctionPipeline{ Id: interfaces.DefaultPipelineId, Transforms: transforms, - Topic: runtime.TopicWildCard, + Topics: []string{runtime.TopicWildCard}, } pipelines[pipeline.Id] = pipeline } @@ -289,7 +289,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F pipeline := interfaces.FunctionPipeline{ Id: perTopicPipeline.Id, Transforms: transforms, - Topic: perTopicPipeline.Topic, + Topics: util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.Topics, util.SplitComma)), } pipelines[pipeline.Id] = pipeline @@ -384,26 +384,27 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc return nil } -// AddFunctionsPipelineForTopic adds a functions pipeline for the specified for the specified id and topic -func (svc *Service) AddFunctionsPipelineForTopic(id string, topic string, transforms ...interfaces.AppFunction) error { - if strings.ToUpper(svc.config.Trigger.Type) == TriggerTypeHTTP { - return errors.New("pipeline per topic not valid with HTTP trigger") - } - +// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics +func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error { if len(transforms) == 0 { return errors.New("no transforms provided to pipeline") } - if len(strings.TrimSpace(topic)) == 0 { - return errors.New("topic for pipeline can not be blank") + if len(topics) == 0 { + return errors.New("topics for pipeline can not be empty") } - err := svc.runtime.AddFunctionsPipeline(id, topic, transforms) + for _, t := range topics { + if strings.TrimSpace(t) == "" { + return errors.New("blank topic not allowed") + } + } + err := svc.runtime.AddFunctionsPipeline(id, topics, transforms) if err != nil { return err } - svc.lc.Debugf("Pipeline '%s' added for topic '%s' with %d transform(s)", id, topic, len(transforms)) + svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, topics, len(transforms)) return nil } diff --git a/internal/app/service_test.go b/internal/app/service_test.go index a8afd7ff9..9f2a04b1e 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -298,7 +298,7 @@ func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) { require.NoError(t, err) } -func TestService_AddFunctionsPipelineForTopic(t *testing.T) { +func TestService_AddFunctionsPipelineForTopics(t *testing.T) { service := Service{ lc: lc, runtime: runtime.NewGolangRuntime("", nil, nil), @@ -313,32 +313,33 @@ func TestService_AddFunctionsPipelineForTopic(t *testing.T) { transforms := []interfaces.AppFunction{tags.AddTags} - // This sets the Default Pipeline allowing to test for duplicate iD. err := service.SetDefaultFunctionsPipeline(transforms...) + require.NoError(t, err) + defaultTopics := []string{"#"} + tests := []struct { name string id string trigger string - topic string + topics []string transforms []interfaces.AppFunction expectError bool }{ - {"Happy Path", "123", TriggerTypeMessageBus, "#", transforms, false}, - {"Happy Path Custom", "124", "CUSTOM TRIGGER", "#", transforms, false}, - {"Empty Topic", "125", TriggerTypeMessageBus, " ", transforms, true}, - {"No Transforms", "126", TriggerTypeMessageBus, "#", nil, true}, - {"Default Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, "#", transforms, true}, - {"Duplicate Id", "123", TriggerTypeMessageBus, "#", transforms, true}, - {"HTTP Trigger Type", "127", TriggerTypeHTTP, "#", transforms, true}, + {"Happy Path", "123", TriggerTypeMessageBus, defaultTopics, transforms, false}, + {"Duplicate Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, defaultTopics, transforms, true}, + {"Happy Path Custom", "124", "CUSTOM TRIGGER", defaultTopics, transforms, false}, + {"Empty Topic", "125", TriggerTypeMessageBus, []string{" "}, transforms, true}, + {"Empty Topics", "126", TriggerTypeMessageBus, nil, transforms, true}, + {"No Transforms", "127", TriggerTypeMessageBus, defaultTopics, nil, true}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { service.config.Trigger.Type = test.trigger - err := service.AddFunctionsPipelineForTopic(test.id, test.topic, test.transforms...) + err := service.AddFunctionsPipelineForTopics(test.id, test.topics, test.transforms...) if test.expectError { require.Error(t, err) return @@ -480,7 +481,7 @@ func TestLoadConfigurableFunctionPipelinesDefaultNotFound(t *testing.T) { if len(test.perTopicExecutionOrder) > 0 { service.config.Writable.Pipeline.PerTopicPipelines["bogus"] = common.TopicPipeline{ Id: "bogus", - Topic: "#", + Topics: "#", ExecutionOrder: test.perTopicExecutionOrder, } } @@ -539,7 +540,7 @@ func TestLoadConfigurableFunctionPipelinesNumFunctions(t *testing.T) { PerTopicPipelines: map[string]common.TopicPipeline{ perTopicPipelineId: { Id: perTopicPipelineId, - Topic: "#", + Topics: "#", ExecutionOrder: "FilterByDeviceName, Transform, SetResponseData", }, }, diff --git a/internal/common/config.go b/internal/common/config.go index 3d1062645..b3ae6dc79 100644 --- a/internal/common/config.go +++ b/internal/common/config.go @@ -162,12 +162,12 @@ type PipelineInfo struct { Functions map[string]PipelineFunction } -// TopicPipeline define the data to a Pre Topic functions pipeline +// TopicPipeline define the data to a Per Topics functions pipeline type TopicPipeline struct { // Id is the unique ID of the pipeline instance Id string - // Topic is the topic which must match the incoming topic inorder for the pipeline to execute - Topic string + // Topics is the set of comma separated topics matched against the incoming to determine if pipeline should execute + Topics string // ExecutionOrder is a list of functions, in execution order, for the pipeline instance ExecutionOrder string } diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index 8e13e4896..b8ffb813d 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -47,11 +48,11 @@ const ( TopicLevelSeparator = "/" ) -func NewFunctionPipeline(id string, topic string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline { +func NewFunctionPipeline(id string, topics []string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline { pipeline := interfaces.FunctionPipeline{ Id: id, Transforms: transforms, - Topic: topic, + Topics: topics, Hash: calculatePipelineHash(transforms), } @@ -99,17 +100,17 @@ func (gr *GolangRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.App return nil } - return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, TopicWildCard, transforms) + return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, transforms) } // AddFunctionsPipeline is thread safe to set transforms -func (gr *GolangRuntime) AddFunctionsPipeline(id string, topic string, transforms []interfaces.AppFunction) error { +func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error { _, exists := gr.pipelines[id] if exists { return fmt.Errorf("pipeline with Id='%s' already exists", id) } - pipeline := NewFunctionPipeline(id, topic, transforms) + pipeline := NewFunctionPipeline(id, topics, transforms) gr.isBusyCopying.Lock() gr.pipelines[id] = &pipeline gr.isBusyCopying.Unlock() @@ -206,7 +207,7 @@ func (gr *GolangRuntime) ProcessMessage( execPipeline := &interfaces.FunctionPipeline{ Id: pipeline.Id, Transforms: make([]interfaces.AppFunction, len(pipeline.Transforms)), - Topic: pipeline.Topic, + Topics: pipeline.Topics, Hash: pipeline.Hash, } copy(execPipeline.Transforms, pipeline.Transforms) @@ -381,7 +382,7 @@ func (gr *GolangRuntime) GetMatchingPipelines(incomingTopic string) []*interface } for _, pipeline := range gr.pipelines { - if topicMatches(incomingTopic, pipeline.Topic) { + if topicMatches(incomingTopic, pipeline.Topics) { matches = append(matches, pipeline) } } @@ -393,32 +394,39 @@ func (gr *GolangRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline return gr.pipelines[id] } -func topicMatches(incomingTopic string, pipelineTopic string) bool { - if pipelineTopic == TopicWildCard { - return true - } +func topicMatches(incomingTopic string, pipelineTopics []string) bool { + for _, pipelineTopic := range pipelineTopics { + if pipelineTopic == TopicWildCard { + return true + } - wildcardCount := strings.Count(pipelineTopic, TopicWildCard) - switch wildcardCount { - case 0: - return incomingTopic == pipelineTopic - default: - pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator) - incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator) + wildcardCount := strings.Count(pipelineTopic, TopicWildCard) + switch wildcardCount { + case 0: + if incomingTopic == pipelineTopic { + return true + } + default: + pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator) + incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator) - if len(pipelineLevels) > len(incomingLevels) { - return false - } + if len(pipelineLevels) > len(incomingLevels) { + continue + } - for index, level := range pipelineLevels { - if level == TopicWildCard { - incomingLevels[index] = TopicWildCard + for index, level := range pipelineLevels { + if level == TopicWildCard { + incomingLevels[index] = TopicWildCard + } } - } - incomingWithWildCards := strings.Join(incomingLevels, "/") - return strings.Index(incomingWithWildCards, pipelineTopic) == 0 + incomingWithWildCards := strings.Join(incomingLevels, "/") + if strings.Index(incomingWithWildCards, pipelineTopic) == 0 { + return true + } + } } + return false } func calculatePipelineHash(transforms []interfaces.AppFunction) string { diff --git a/internal/runtime/runtime_test.go b/internal/runtime/runtime_test.go index bd76707c7..c58cd1107 100644 --- a/internal/runtime/runtime_test.go +++ b/internal/runtime/runtime_test.go @@ -1,5 +1,6 @@ // // Copyright (c) 2021 Intel Corporation +// Copyright (c) 2021 One Track Consulting // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -531,42 +532,46 @@ func TestTopicMatches(t *testing.T) { incomingTopic := "edgex/events/P/D/S" tests := []struct { - name string - incomingTopic string - pipelineTopic string - expected bool + name string + incomingTopic string + pipelineTopics []string + expected bool }{ - {"Match - Default all", incomingTopic, TopicWildCard, true}, - {"Match - Exact", incomingTopic, incomingTopic, true}, - {"Match - Any Profile for Device and Source", incomingTopic, "edgex/events/#/D/S", true}, - {"Match - Any Device for Profile and Source", incomingTopic, "edgex/events/P/#/S", true}, - {"Match - Any Source for Profile and Device", incomingTopic, "edgex/events/P/D/#", true}, - {"Match - All Events ", incomingTopic, "edgex/events/#", true}, - {"Match - All Devices and Sources for Profile ", incomingTopic, "edgex/events/P/#", true}, - {"Match - All Sources for Profile and Device ", incomingTopic, "edgex/events/P/D/#", true}, - {"Match - All Sources for a Device for any Profile ", incomingTopic, "edgex/events/#/D/#", true}, - {"Match - Source for any Profile and any Device ", incomingTopic, "edgex/events/#/#/S", true}, - {"NoMatch - SourceX for any Profile and any Device ", incomingTopic, "edgex/events/#/#/Sx", false}, - {"NoMatch - All Sources for DeviceX and any Profile ", incomingTopic, "edgex/events/#/Dx/#", false}, - {"NoMatch - All Sources for ProfileX and Device ", incomingTopic, "edgex/events/Px/D/#", false}, - {"NoMatch - All Sources for Profile and DeviceX ", incomingTopic, "edgex/events/P/Dx/#", false}, - {"NoMatch - All Sources for ProfileX and DeviceX ", incomingTopic, "edgex/events/Px/Dx/#", false}, - {"NoMatch - All Devices and Sources for ProfileX ", incomingTopic, "edgex/events/Px/#", false}, - {"NoMatch - Any Profile for DeviceX and Source", incomingTopic, "edgex/events/#/Dx/S", false}, - {"NoMatch - Any Profile for DeviceX and Source", incomingTopic, "edgex/events/#/Dx/S", false}, - {"NoMatch - Any Profile for Device and SourceX", incomingTopic, "edgex/events/#/D/Sx", false}, - {"NoMatch - Any Profile for DeviceX and SourceX", incomingTopic, "edgex/events/#/Dx/Sx", false}, - {"NoMatch - Any Device for Profile and SourceX", incomingTopic, "edgex/events/P/#/Sx", false}, - {"NoMatch - Any Device for ProfileX and Source", incomingTopic, "edgex/events/Px/#/S", false}, - {"NoMatch - Any Device for ProfileX and SourceX", incomingTopic, "edgex/events/Px/#/Sx", false}, - {"NoMatch - Any Source for ProfileX and Device", incomingTopic, "edgex/events/Px/D/#", false}, - {"NoMatch - Any Source for Profile and DeviceX", incomingTopic, "edgex/events/P/Dx/#", false}, - {"NoMatch - Any Source for ProfileX and DeviceX", incomingTopic, "edgex/events/Px/Dx/#", false}, + {"Match - Default all", incomingTopic, []string{TopicWildCard}, true}, + {"Match - Not First Topic", incomingTopic, []string{"not-edgex/#", TopicWildCard}, true}, + {"Match - Exact", incomingTopic, []string{incomingTopic}, true}, + {"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/#/D/S"}, true}, + {"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/#/D/S"}, true}, + {"Match - Any Device for Profile and Source", incomingTopic, []string{"edgex/events/P/#/S"}, true}, + {"Match - Any Source for Profile and Device", incomingTopic, []string{"edgex/events/P/D/#"}, true}, + {"Match - All Events ", incomingTopic, []string{"edgex/events/#"}, true}, + {"Match - First Topic Deeper ", incomingTopic, []string{"edgex/events/P/D/S/Z", "edgex/events/#"}, true}, + {"Match - All Devices and Sources for Profile ", incomingTopic, []string{"edgex/events/P/#"}, true}, + {"Match - All Sources for Profile and Device ", incomingTopic, []string{"edgex/events/P/D/#"}, true}, + {"Match - All Sources for a Device for any Profile ", incomingTopic, []string{"edgex/events/#/D/#"}, true}, + {"Match - Source for any Profile and any Device ", incomingTopic, []string{"edgex/events/#/#/S"}, true}, + {"NoMatch - SourceX for any Profile and any Device ", incomingTopic, []string{"edgex/events/#/#/Sx"}, false}, + {"NoMatch - All Sources for DeviceX and any Profile ", incomingTopic, []string{"edgex/events/#/Dx/#"}, false}, + {"NoMatch - All Sources for ProfileX and Device ", incomingTopic, []string{"edgex/events/Px/D/#"}, false}, + {"NoMatch - All Sources for Profile and DeviceX ", incomingTopic, []string{"edgex/events/P/Dx/#"}, false}, + {"NoMatch - All Sources for ProfileX and DeviceX ", incomingTopic, []string{"edgex/events/Px/Dx/#"}, false}, + {"NoMatch - All Devices and Sources for ProfileX ", incomingTopic, []string{"edgex/events/Px/#"}, false}, + {"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/#/Dx/S"}, false}, + {"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/#/Dx/S"}, false}, + {"NoMatch - Any Profile for Device and SourceX", incomingTopic, []string{"edgex/events/#/D/Sx"}, false}, + {"NoMatch - Any Profile for DeviceX and SourceX", incomingTopic, []string{"edgex/events/#/Dx/Sx"}, false}, + {"NoMatch - Any Device for Profile and SourceX", incomingTopic, []string{"edgex/events/P/#/Sx"}, false}, + {"NoMatch - Any Device for ProfileX and Source", incomingTopic, []string{"edgex/events/Px/#/S"}, false}, + {"NoMatch - Any Device for ProfileX and SourceX", incomingTopic, []string{"edgex/events/Px/#/Sx"}, false}, + {"NoMatch - Any Source for ProfileX and Device", incomingTopic, []string{"edgex/events/Px/D/#"}, false}, + {"NoMatch - Any Source for Profile and DeviceX", incomingTopic, []string{"edgex/events/P/Dx/#"}, false}, + {"NoMatch - Any Source for ProfileX and DeviceX", incomingTopic, []string{"edgex/events/Px/Dx/#"}, false}, + {"NoMatch - Pipeline Topic Deeper", incomingTopic, []string{"edgex/events/P/D/S/Z"}, false}, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - actual := topicMatches(test.incomingTopic, test.pipelineTopic) + actual := topicMatches(test.incomingTopic, test.pipelineTopics) assert.Equal(t, test.expected, actual) }) } @@ -576,7 +581,7 @@ func TestGetPipelineById(t *testing.T) { target := NewGolangRuntime(serviceKey, nil, nil) expectedId := "my-pipeline" - expectedTopic := "edgex/events/#" + expectedTopics := []string{"edgex/events/#"} expectedTransforms := []interfaces.AppFunction{ transforms.NewResponseData().SetResponseData, } @@ -585,20 +590,20 @@ func TestGetPipelineById(t *testing.T) { err := target.SetDefaultFunctionsPipeline(expectedTransforms) require.NoError(t, err) - err = target.AddFunctionsPipeline(expectedId, expectedTopic, expectedTransforms) + err = target.AddFunctionsPipeline(expectedId, expectedTopics, expectedTransforms) require.NoError(t, err) actual := target.GetPipelineById(interfaces.DefaultPipelineId) require.NotNil(t, actual) assert.Equal(t, interfaces.DefaultPipelineId, actual.Id) - assert.Equal(t, TopicWildCard, actual.Topic) + assert.Equal(t, []string{TopicWildCard}, actual.Topics) assert.Equal(t, expectedTransforms, actual.Transforms) assert.NotEmpty(t, actual.Hash) actual = target.GetPipelineById(expectedId) require.NotNil(t, actual) assert.Equal(t, expectedId, actual.Id) - assert.Equal(t, expectedTopic, actual.Topic) + assert.Equal(t, expectedTopics, actual.Topics) assert.Equal(t, expectedTransforms, actual.Transforms) assert.NotEmpty(t, actual.Hash) @@ -613,11 +618,11 @@ func TestGetMatchingPipelines(t *testing.T) { transforms.NewResponseData().SetResponseData, } - err := target.AddFunctionsPipeline("one", "edgex/events/#/D1/#", expectedTransforms) + err := target.AddFunctionsPipeline("one", []string{"edgex/events/#/D1/#"}, expectedTransforms) require.NoError(t, err) - err = target.AddFunctionsPipeline("two", "edgex/events/P1/#", expectedTransforms) + err = target.AddFunctionsPipeline("two", []string{"edgex/events/P1/#"}, expectedTransforms) require.NoError(t, err) - err = target.AddFunctionsPipeline("three", "edgex/events/P1/D1/S1", expectedTransforms) + err = target.AddFunctionsPipeline("three", []string{"edgex/events/P1/D1/S1"}, expectedTransforms) require.NoError(t, err) tests := []struct { @@ -650,7 +655,7 @@ func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { actual := target.GetDefaultPipeline() require.NotNil(t, actual) assert.Equal(t, interfaces.DefaultPipelineId, actual.Id) - assert.Empty(t, actual.Topic) + assert.Empty(t, actual.Topics) assert.Nil(t, actual.Transforms) assert.Empty(t, actual.Hash) @@ -660,7 +665,7 @@ func TestGolangRuntime_GetDefaultPipeline(t *testing.T) { actual = target.GetDefaultPipeline() require.NotNil(t, actual) assert.Equal(t, interfaces.DefaultPipelineId, actual.Id) - assert.Equal(t, TopicWildCard, actual.Topic) + assert.Equal(t, []string{TopicWildCard}, actual.Topics) assert.Equal(t, expectedTransforms, actual.Transforms) assert.NotEmpty(t, actual.Hash) } diff --git a/internal/runtime/storeforward_test.go b/internal/runtime/storeforward_test.go index 477aabbe5..780005d50 100644 --- a/internal/runtime/storeforward_test.go +++ b/internal/runtime/storeforward_test.go @@ -103,10 +103,10 @@ func TestProcessRetryItems(t *testing.T) { {"RetryCount Increased - Default", failureTransform, true, expectedPayload, 4, 5, 0, false, contextData, false}, {"Max Retries - Default", failureTransform, true, expectedPayload, 9, 9, 1, false, contextData, false}, {"Bad Version - Default", successTransform, false, expectedPayload, 0, 0, 1, true, contextData, false}, - {"Happy Path - Per Topic", successTransform, true, expectedPayload, 0, 0, 1, false, contextData, true}, - {"RetryCount Increased - Per Topic", failureTransform, true, expectedPayload, 4, 5, 0, false, contextData, true}, - {"Max Retries - Per Topic", failureTransform, true, expectedPayload, 9, 9, 1, false, contextData, true}, - {"Bad Version - Per Topic", successTransform, false, expectedPayload, 0, 0, 1, true, contextData, true}, + {"Happy Path - Per Topics", successTransform, true, expectedPayload, 0, 0, 1, false, contextData, true}, + {"RetryCount Increased - Per Topics", failureTransform, true, expectedPayload, 4, 5, 0, false, contextData, true}, + {"Max Retries - Per Topics", failureTransform, true, expectedPayload, 9, 9, 1, false, contextData, true}, + {"Bad Version - Per Topics", successTransform, false, expectedPayload, 0, 0, 1, true, contextData, true}, } for _, test := range tests { @@ -117,7 +117,7 @@ func TestProcessRetryItems(t *testing.T) { var pipeline *interfaces.FunctionPipeline if test.UsePerTopic { - err := runtime.AddFunctionsPipeline("per-topic", "#", []interfaces.AppFunction{transformPassthru, transformPassthru, test.TargetTransform}) + err := runtime.AddFunctionsPipeline("per-topic", []string{"#"}, []interfaces.AppFunction{transformPassthru, transformPassthru, test.TargetTransform}) require.NoError(t, err) pipeline = runtime.GetPipelineById("per-topic") require.NotNil(t, pipeline) @@ -169,9 +169,9 @@ func TestDoStoreAndForwardRetry(t *testing.T) { {"RetryCount Increased - Default", httpPost, 1, 2, 1, false}, {"Max Retries - Default", httpPost, 9, 0, 0, false}, {"Retry Success - Default", successTransform, 1, 0, 0, false}, - {"RetryCount Increased - Per Topic", httpPost, 1, 2, 1, true}, - {"Max Retries - Per Topic", httpPost, 9, 0, 0, true}, - {"Retry Success - Per Topic", successTransform, 1, 0, 0, true}, + {"RetryCount Increased - Per Topics", httpPost, 1, 2, 1, true}, + {"Max Retries - Per Topics", httpPost, 9, 0, 0, true}, + {"Retry Success - Per Topics", successTransform, 1, 0, 0, true}, } for _, test := range tests { @@ -181,7 +181,7 @@ func TestDoStoreAndForwardRetry(t *testing.T) { var pipeline *interfaces.FunctionPipeline if test.UsePerTopic { - err := runtime.AddFunctionsPipeline("per-topic", "#", []interfaces.AppFunction{transformPassthru, test.TargetTransform}) + err := runtime.AddFunctionsPipeline("per-topic", []string{"#"}, []interfaces.AppFunction{transformPassthru, test.TargetTransform}) require.NoError(t, err) pipeline = runtime.GetPipelineById("per-topic") require.NotNil(t, pipeline) diff --git a/internal/trigger/messagebus/messaging_test.go b/internal/trigger/messagebus/messaging_test.go index 53ce483e2..54c0a6fca 100644 --- a/internal/trigger/messagebus/messaging_test.go +++ b/internal/trigger/messagebus/messaging_test.go @@ -263,9 +263,9 @@ func TestPipelinePerTopic(t *testing.T) { goRuntime := runtime.NewGolangRuntime("", nil, dic) - err = goRuntime.AddFunctionsPipeline("P1", "edgex/events/device/P1/#", []interfaces.AppFunction{transform1}) + err = goRuntime.AddFunctionsPipeline("P1", []string{"edgex/events/device/P1/#"}, []interfaces.AppFunction{transform1}) require.NoError(t, err) - err = goRuntime.AddFunctionsPipeline("P2", "edgex/events/device/P2/#", []interfaces.AppFunction{transform2}) + err = goRuntime.AddFunctionsPipeline("P2", []string{"edgex/events/device/P2/#"}, []interfaces.AppFunction{transform2}) require.NoError(t, err) trigger := NewTrigger(dic, goRuntime) diff --git a/pkg/interfaces/mocks/ApplicationService.go b/pkg/interfaces/mocks/ApplicationService.go index ddd32a022..a7b9c4423 100644 --- a/pkg/interfaces/mocks/ApplicationService.go +++ b/pkg/interfaces/mocks/ApplicationService.go @@ -67,20 +67,20 @@ func (_m *ApplicationService) AddBackgroundPublisherWithTopic(capacity int, topi return r0, r1 } -// AddFunctionsPipelineForTopic provides a mock function with given fields: id, topic, transforms -func (_m *ApplicationService) AddFunctionsPipelineForTopic(id string, topic string, transforms ...func(interfaces.AppFunctionContext, interface{}) (bool, interface{})) error { +// AddFunctionsPipelineForTopics provides a mock function with given fields: id, topic, transforms +func (_m *ApplicationService) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...func(interfaces.AppFunctionContext, interface{}) (bool, interface{})) error { _va := make([]interface{}, len(transforms)) for _i := range transforms { _va[_i] = transforms[_i] } var _ca []interface{} - _ca = append(_ca, id, topic) + _ca = append(_ca, id, topics) _ca = append(_ca, _va...) ret := _m.Called(_ca...) var r0 error - if rf, ok := ret.Get(0).(func(string, string, ...func(interfaces.AppFunctionContext, interface{}) (bool, interface{})) error); ok { - r0 = rf(id, topic, transforms...) + if rf, ok := ret.Get(0).(func(string, []string, ...func(interfaces.AppFunctionContext, interface{}) (bool, interface{})) error); ok { + r0 = rf(id, topics, transforms...) } else { r0 = ret.Error(0) } diff --git a/pkg/interfaces/service.go b/pkg/interfaces/service.go index 625c61b75..91d7d33ad 100644 --- a/pkg/interfaces/service.go +++ b/pkg/interfaces/service.go @@ -48,8 +48,8 @@ type FunctionPipeline struct { Id string // Collection of App Functions to execute Transforms []AppFunction - // Topic to match against the incoming topic to determine if the pipeline will execute on the incoming message - Topic string + // Topics to match against the incoming topic to determine if the pipeline will execute on the incoming message + Topics []string // Hash of the list of transforms set and used internally for Store and Forward Hash string } @@ -88,11 +88,11 @@ type ApplicationService interface { // Note that the functions are executed in the order provided in the list. // An error is returned if the list is empty. SetDefaultFunctionsPipeline(transforms ...AppFunction) error - // AddFunctionsPipelineForTopic adds a functions pipeline with the specified unique id and list of Application Functions - // to be executed when the incoming topic matches the specified topic. The specified topic may contain the '#' wildcard + // AddFunctionsPipelineForTopics adds a functions pipeline with the specified unique id and list of Application Functions + // to be executed when the incoming topic matches any of the specified topics. The specified topic may contain the '#' wildcard // so that it matches multiple incoming topics. If just "#" is used for the specified topic it will match all incoming // topics and the specified functions pipeline will execute on every message received. - AddFunctionsPipelineForTopic(id string, topic string, transforms ...AppFunction) error + AddFunctionsPipelineForTopics(id string, topic []string, transforms ...AppFunction) error // MakeItRun starts the configured trigger to allow the functions pipeline to execute when the trigger // receives data and starts the internal webserver. This is a long running function which does not return until // the service is stopped or MakeItStop() is called.