Skip to content

Commit

Permalink
feat(sdk): Support Multi-Topic Pipeline Configuration
Browse files Browse the repository at this point in the history
Match incoming topic against an array of topics when determining whether
or not to execute a given pipeline.  Allow Topics configuration as comma
separated list similar to messaging triggers.

Also allow configuration of non-default pipeline(s) for HTTP trigger.

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse committed Sep 14, 2021
1 parent 29a7969 commit 3030fa0
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 127 deletions.
3 changes: 2 additions & 1 deletion app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
Expand Down
10 changes: 5 additions & 5 deletions app-service-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
sample := functions.NewSample()

// TODO: Replace below functions with built in and/or your custom functions for your use case
// or remove is using Pipeline By Topic below.
// or remove if using Pipeline By Topics below.
// See https://docs.edgexfoundry.org/2.0/microservices/application/BuiltIn/ for list of built-in functions
err = app.service.SetFunctionsPipeline(
err = app.service.SetDefaultFunctionsPipeline(
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand All @@ -120,8 +120,8 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
// Core Data publishes to the 'edgex/events/core/<profile-name>/<device-name>/<source-name>' topic
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
// See <Pipeline By Topic documentation url TBD> for more details.
err = app.service.AddFunctionsPipelineForTopic("Floats", "edgex/events/#/#/Random-Float-Device/#",
// See https://docs.edgexfoundry.org/2.0/microservices/application/AdvancedTopics/#pipeline-per-topics for more details.
err = app.service.AddFunctionsPipelineForTopics("Floats", []string{"edgex/events/#/#/Random-Float-Device/#"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand All @@ -132,7 +132,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
}
// Note: This example with default above causes Events from Int32 source to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
err = app.service.AddFunctionsPipelineForTopic("Int32s", "edgex/events/#/#/#/Int32",
err = app.service.AddFunctionsPipelineForTopics("Int32s", []string{"edgex/events/#/#/#/Int32"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand Down
10 changes: 5 additions & 5 deletions app-service-template/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestCreateAndRunService_Success(t *testing.T) {
mockAppService.On("LoggingClient").Return(logger.NewMockClient())
mockAppService.On("GetAppSettingStrings", "DeviceNames").
Return([]string{"Random-Boolean-Device, Random-Integer-Device"}, nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("LoadCustomConfig", mock.Anything, mock.Anything, mock.Anything).
Return(nil).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestCreateAndRunService_SetFunctionsPipeline_Failed(t *testing.T) {
})
mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
setFunctionsPipelineCalled = true
})
Expand Down Expand Up @@ -148,9 +148,9 @@ func TestCreateAndRunService_MakeItRun_Failed(t *testing.T) {
})
mockAppService.On("ListenForCustomConfigChanges", mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("SetFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("SetDefaultFunctionsPipeline", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("AddFunctionsPipelineForTopic", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
mockAppService.On("AddFunctionsPipelineForTopics", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil)
mockAppService.On("MakeItRun").Return(fmt.Errorf("Failed")).Run(func(args mock.Arguments) {
makeItRunCalled = true
Expand Down
25 changes: 13 additions & 12 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
pipeline := interfaces.FunctionPipeline{
Id: interfaces.DefaultPipelineId,
Transforms: transforms,
Topic: runtime.TopicWildCard,
Topics: []string{runtime.TopicWildCard},
}
pipelines[pipeline.Id] = pipeline
}
Expand All @@ -289,7 +289,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
pipeline := interfaces.FunctionPipeline{
Id: perTopicPipeline.Id,
Transforms: transforms,
Topic: perTopicPipeline.Topic,
Topics: util.DeleteEmptyAndTrim(strings.FieldsFunc(perTopicPipeline.Topics, util.SplitComma)),
}

pipelines[pipeline.Id] = pipeline
Expand Down Expand Up @@ -384,26 +384,27 @@ func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunc
return nil
}

// AddFunctionsPipelineForTopic adds a functions pipeline for the specified for the specified id and topic
func (svc *Service) AddFunctionsPipelineForTopic(id string, topic string, transforms ...interfaces.AppFunction) error {
if strings.ToUpper(svc.config.Trigger.Type) == TriggerTypeHTTP {
return errors.New("pipeline per topic not valid with HTTP trigger")
}

// AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error {
if len(transforms) == 0 {
return errors.New("no transforms provided to pipeline")
}

if len(strings.TrimSpace(topic)) == 0 {
return errors.New("topic for pipeline can not be blank")
if len(topics) == 0 {
return errors.New("topics for pipeline can not be empty")
}

err := svc.runtime.AddFunctionsPipeline(id, topic, transforms)
for _, t := range topics {
if strings.TrimSpace(t) == "" {
return errors.New("blank topic not allowed")
}
}
err := svc.runtime.AddFunctionsPipeline(id, topics, transforms)
if err != nil {
return err
}

svc.lc.Debugf("Pipeline '%s' added for topic '%s' with %d transform(s)", id, topic, len(transforms))
svc.lc.Debugf("Pipeline '%s' added for topics '%v' with %d transform(s)", id, topics, len(transforms))
return nil
}

Expand Down
27 changes: 14 additions & 13 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestSetDefaultFunctionsPipelineOneTransform(t *testing.T) {
require.NoError(t, err)
}

func TestService_AddFunctionsPipelineForTopic(t *testing.T) {
func TestService_AddFunctionsPipelineForTopics(t *testing.T) {
service := Service{
lc: lc,
runtime: runtime.NewGolangRuntime("", nil, nil),
Expand All @@ -313,32 +313,33 @@ func TestService_AddFunctionsPipelineForTopic(t *testing.T) {

transforms := []interfaces.AppFunction{tags.AddTags}

// This sets the Default Pipeline allowing to test for duplicate iD.
err := service.SetDefaultFunctionsPipeline(transforms...)

require.NoError(t, err)

defaultTopics := []string{"#"}

tests := []struct {
name string
id string
trigger string
topic string
topics []string
transforms []interfaces.AppFunction
expectError bool
}{
{"Happy Path", "123", TriggerTypeMessageBus, "#", transforms, false},
{"Happy Path Custom", "124", "CUSTOM TRIGGER", "#", transforms, false},
{"Empty Topic", "125", TriggerTypeMessageBus, " ", transforms, true},
{"No Transforms", "126", TriggerTypeMessageBus, "#", nil, true},
{"Default Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, "#", transforms, true},
{"Duplicate Id", "123", TriggerTypeMessageBus, "#", transforms, true},
{"HTTP Trigger Type", "127", TriggerTypeHTTP, "#", transforms, true},
{"Happy Path", "123", TriggerTypeMessageBus, defaultTopics, transforms, false},
{"Duplicate Id", interfaces.DefaultPipelineId, TriggerTypeMessageBus, defaultTopics, transforms, true},
{"Happy Path Custom", "124", "CUSTOM TRIGGER", defaultTopics, transforms, false},
{"Empty Topic", "125", TriggerTypeMessageBus, []string{" "}, transforms, true},
{"Empty Topics", "126", TriggerTypeMessageBus, nil, transforms, true},
{"No Transforms", "127", TriggerTypeMessageBus, defaultTopics, nil, true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
service.config.Trigger.Type = test.trigger

err := service.AddFunctionsPipelineForTopic(test.id, test.topic, test.transforms...)
err := service.AddFunctionsPipelineForTopics(test.id, test.topics, test.transforms...)
if test.expectError {
require.Error(t, err)
return
Expand Down Expand Up @@ -480,7 +481,7 @@ func TestLoadConfigurableFunctionPipelinesDefaultNotFound(t *testing.T) {
if len(test.perTopicExecutionOrder) > 0 {
service.config.Writable.Pipeline.PerTopicPipelines["bogus"] = common.TopicPipeline{
Id: "bogus",
Topic: "#",
Topics: "#",
ExecutionOrder: test.perTopicExecutionOrder,
}
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestLoadConfigurableFunctionPipelinesNumFunctions(t *testing.T) {
PerTopicPipelines: map[string]common.TopicPipeline{
perTopicPipelineId: {
Id: perTopicPipelineId,
Topic: "#",
Topics: "#",
ExecutionOrder: "FilterByDeviceName, Transform, SetResponseData",
},
},
Expand Down
6 changes: 3 additions & 3 deletions internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ type PipelineInfo struct {
Functions map[string]PipelineFunction
}

// TopicPipeline define the data to a Pre Topic functions pipeline
// TopicPipeline define the data to a Per Topics functions pipeline
type TopicPipeline struct {
// Id is the unique ID of the pipeline instance
Id string
// Topic is the topic which must match the incoming topic inorder for the pipeline to execute
Topic string
// Topics is the set of comma separated topics matched against the incoming to determine if pipeline should execute
Topics string
// ExecutionOrder is a list of functions, in execution order, for the pipeline instance
ExecutionOrder string
}
Expand Down
62 changes: 35 additions & 27 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright (c) 2021 Intel Corporation
// Copyright (c) 2021 One Track Consulting
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,11 +48,11 @@ const (
TopicLevelSeparator = "/"
)

func NewFunctionPipeline(id string, topic string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline {
func NewFunctionPipeline(id string, topics []string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline {
pipeline := interfaces.FunctionPipeline{
Id: id,
Transforms: transforms,
Topic: topic,
Topics: topics,
Hash: calculatePipelineHash(transforms),
}

Expand Down Expand Up @@ -99,17 +100,17 @@ func (gr *GolangRuntime) SetDefaultFunctionsPipeline(transforms []interfaces.App
return nil
}

return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, TopicWildCard, transforms)
return gr.AddFunctionsPipeline(interfaces.DefaultPipelineId, []string{TopicWildCard}, transforms)
}

// AddFunctionsPipeline is thread safe to set transforms
func (gr *GolangRuntime) AddFunctionsPipeline(id string, topic string, transforms []interfaces.AppFunction) error {
func (gr *GolangRuntime) AddFunctionsPipeline(id string, topics []string, transforms []interfaces.AppFunction) error {
_, exists := gr.pipelines[id]
if exists {
return fmt.Errorf("pipeline with Id='%s' already exists", id)
}

pipeline := NewFunctionPipeline(id, topic, transforms)
pipeline := NewFunctionPipeline(id, topics, transforms)
gr.isBusyCopying.Lock()
gr.pipelines[id] = &pipeline
gr.isBusyCopying.Unlock()
Expand Down Expand Up @@ -206,7 +207,7 @@ func (gr *GolangRuntime) ProcessMessage(
execPipeline := &interfaces.FunctionPipeline{
Id: pipeline.Id,
Transforms: make([]interfaces.AppFunction, len(pipeline.Transforms)),
Topic: pipeline.Topic,
Topics: pipeline.Topics,
Hash: pipeline.Hash,
}
copy(execPipeline.Transforms, pipeline.Transforms)
Expand Down Expand Up @@ -381,7 +382,7 @@ func (gr *GolangRuntime) GetMatchingPipelines(incomingTopic string) []*interface
}

for _, pipeline := range gr.pipelines {
if topicMatches(incomingTopic, pipeline.Topic) {
if topicMatches(incomingTopic, pipeline.Topics) {
matches = append(matches, pipeline)
}
}
Expand All @@ -393,32 +394,39 @@ func (gr *GolangRuntime) GetPipelineById(id string) *interfaces.FunctionPipeline
return gr.pipelines[id]
}

func topicMatches(incomingTopic string, pipelineTopic string) bool {
if pipelineTopic == TopicWildCard {
return true
}
func topicMatches(incomingTopic string, pipelineTopics []string) bool {
for _, pipelineTopic := range pipelineTopics {
if pipelineTopic == TopicWildCard {
return true
}

wildcardCount := strings.Count(pipelineTopic, TopicWildCard)
switch wildcardCount {
case 0:
return incomingTopic == pipelineTopic
default:
pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator)
incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator)
wildcardCount := strings.Count(pipelineTopic, TopicWildCard)
switch wildcardCount {
case 0:
if incomingTopic == pipelineTopic {
return true
}
default:
pipelineLevels := strings.Split(pipelineTopic, TopicLevelSeparator)
incomingLevels := strings.Split(incomingTopic, TopicLevelSeparator)

if len(pipelineLevels) > len(incomingLevels) {
return false
}
if len(pipelineLevels) > len(incomingLevels) {
continue
}

for index, level := range pipelineLevels {
if level == TopicWildCard {
incomingLevels[index] = TopicWildCard
for index, level := range pipelineLevels {
if level == TopicWildCard {
incomingLevels[index] = TopicWildCard
}
}
}

incomingWithWildCards := strings.Join(incomingLevels, "/")
return strings.Index(incomingWithWildCards, pipelineTopic) == 0
incomingWithWildCards := strings.Join(incomingLevels, "/")
if strings.Index(incomingWithWildCards, pipelineTopic) == 0 {
return true
}
}
}
return false
}

func calculatePipelineHash(transforms []interfaces.AppFunction) string {
Expand Down
Loading

0 comments on commit 3030fa0

Please sign in to comment.